I have some simple code that to test the performance of aiokafka
library. I am using a Windows computer, running Docker for Windows, and a virtual machine with 8 cores.
The library aiokafka
seems to achieve abusrdly low producer throughput in this situation, about 1 MiB per second:
async def send_many():
f = bigass_object_factory.BigAssObjectFactory()
v = f.create_bytearray()
print(f'{len(v)} length {type(v)} named v')
producer = AIOKafkaProducer(
bootstrap_servers='kafka:9092',
loop=asyncio.get_event_loop(),
acks=0,
)
pr = cProfile.Profile()
pr.enable()
await producer.start()
total_sent = 0
started = time.time()
for i in range(100):
await producer.send('test_topic_aiokafka', value=v)
print(f'sent {i}')
await producer.stop()
pr.disable()
s = io.StringIO()
sortby = 'cumtime'
ps = pstats.Stats(pr, stream=s).sort_stats(sortby)
ps.print_stats()
print(s.getvalue())
def send_many_main():
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
asyncio.get_event_loop().run_until_complete(send_many())
p1 = Process(target=send_many_main)
p1.start()
p1.join()
This is taking 60+ seconds to run on my computer, IE 60+ seconds to send 100 1 MiB messages. I know that's a big message for Kafka but this is ridiculous and I feel pretty certain that this is not supposed to be the performance. When I profile using cProfile
I see this
ncalls tottime percall cumtime percall filename:lineno(function)
169 0.003 0.000 70.619 0.418 /usr/local/lib/python3.9/site-packages/aiokafka/producer/sender.py:260(_send_produce_req)
169 0.008 0.000 70.615 0.418 /usr/local/lib/python3.9/site-packages/aiokafka/producer/sender.py:732(do)
100 0.015 0.000 70.132 0.701 /usr/local/lib/python3.9/site-packages/aiokafka/producer/sender.py:701(create_request)
100 0.003 0.000 70.103 0.701 /usr/local/lib/python3.9/site-packages/aiokafka/producer/message_accumulator.py:236(get_data_buffer)
100 0.001 0.000 70.101 0.701 /usr/local/lib/python3.9/site-packages/aiokafka/producer/message_accumulator.py:83(_build)
100 0.000 0.000 70.099 0.701 /usr/local/lib/python3.9/site-packages/aiokafka/record/default_records.py:526(build)
100 0.119 0.001 70.099 0.701 /usr/local/lib/python3.9/site-packages/aiokafka/record/default_records.py:482(write_header)
100 0.000 0.000 69.968 0.700 /usr/local/lib/python3.9/site-packages/aiokafka/record/util.py:115(calc_crc32c_py)
100 0.001 0.000 69.968 0.700 /usr/local/lib/python3.9/site-packages/aiokafka/record/_crc32c.py:130(crc)
100 69.967 0.700 69.967 0.700 /usr/local/lib/python3.9/site-packages/aiokafka/record/_crc32c.py:100(crc_update)
169 0.004 0.000 0.473 0.003 /usr/local/lib/python3.9/site-packages/aiokafka/client.py:460(send)
104 0.104 0.001 0.466 0.004 /usr/local/lib/python3.9/site-packages/aiokafka/conn.py:374(send)
208 0.001 0.000 0.294 0.001 /usr/local/lib/python3.9/site-packages/kafka/util.py:155(__call__)
208 0.004 0.000 0.293 0.001 /usr/local/lib/python3.9/site-packages/kafka/protocol/struct.py:40(_encode_self)
408/208 0.025 0.000 0.289 0.001 /usr/local/lib/python3.9/site-packages/kafka/protocol/types.py:143(encode)
107 0.002 0.000 0.270 0.003 /opt/project/tests/contrib/aiokafka/test_aiokafka_performance.py:34(send_many)
408/208 0.003 0.000 0.237 0.001 /usr/local/lib/python3.9/site-packages/kafka/protocol/types.py:146(<listcomp>)
202/102 0.002 0.000 0.233 0.002 /usr/local/lib/python3.9/site-packages/kafka/protocol/types.py:181(encode)
So it's spending all it's time in crc32c.py
. When I looked into this code I concluded that there is no way to avoid this code from being called on the producer side. (Obviously if you cannot tell by now I am not a Kafka expert and have no idea why this check is necessary).
But it seems possible that the reason this is so slow is that it is using the python version of crc32c
and not the native version. That's what I am hoping, at least...
So questions are:
- Does this indicate that
crc32c
is the problem? - How can I make sure that I am using the native
crc32c
? - Has anyone seen this with respect to using
aiokafka
onpython:buster-3.9.x
before?