aiokafka producer running on one core in a python:buster Docker container achieves very low throughput 1 MiB per second

371 Views Asked by At

I have some simple code that to test the performance of aiokafka library. I am using a Windows computer, running Docker for Windows, and a virtual machine with 8 cores.

The library aiokafka seems to achieve abusrdly low producer throughput in this situation, about 1 MiB per second:

async def send_many():
  f = bigass_object_factory.BigAssObjectFactory()
  v = f.create_bytearray()
  print(f'{len(v)} length {type(v)} named v')

  producer = AIOKafkaProducer(
    bootstrap_servers='kafka:9092',
    loop=asyncio.get_event_loop(),
    acks=0,
  )

  pr = cProfile.Profile()
  pr.enable()

  await producer.start()

  total_sent = 0
  started = time.time()

  for i in range(100):
    await producer.send('test_topic_aiokafka', value=v)
    print(f'sent {i}')


  await producer.stop()

  pr.disable()
  s = io.StringIO()
  sortby = 'cumtime'
  ps = pstats.Stats(pr, stream=s).sort_stats(sortby)
  ps.print_stats()
  print(s.getvalue())

def send_many_main():
  asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
  asyncio.get_event_loop().run_until_complete(send_many())

p1 = Process(target=send_many_main)
p1.start()
p1.join()

This is taking 60+ seconds to run on my computer, IE 60+ seconds to send 100 1 MiB messages. I know that's a big message for Kafka but this is ridiculous and I feel pretty certain that this is not supposed to be the performance. When I profile using cProfile I see this

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
      169    0.003    0.000   70.619    0.418 /usr/local/lib/python3.9/site-packages/aiokafka/producer/sender.py:260(_send_produce_req)
      169    0.008    0.000   70.615    0.418 /usr/local/lib/python3.9/site-packages/aiokafka/producer/sender.py:732(do)
      100    0.015    0.000   70.132    0.701 /usr/local/lib/python3.9/site-packages/aiokafka/producer/sender.py:701(create_request)
      100    0.003    0.000   70.103    0.701 /usr/local/lib/python3.9/site-packages/aiokafka/producer/message_accumulator.py:236(get_data_buffer)
      100    0.001    0.000   70.101    0.701 /usr/local/lib/python3.9/site-packages/aiokafka/producer/message_accumulator.py:83(_build)
      100    0.000    0.000   70.099    0.701 /usr/local/lib/python3.9/site-packages/aiokafka/record/default_records.py:526(build)
      100    0.119    0.001   70.099    0.701 /usr/local/lib/python3.9/site-packages/aiokafka/record/default_records.py:482(write_header)
      100    0.000    0.000   69.968    0.700 /usr/local/lib/python3.9/site-packages/aiokafka/record/util.py:115(calc_crc32c_py)
      100    0.001    0.000   69.968    0.700 /usr/local/lib/python3.9/site-packages/aiokafka/record/_crc32c.py:130(crc)
      100   69.967    0.700   69.967    0.700 /usr/local/lib/python3.9/site-packages/aiokafka/record/_crc32c.py:100(crc_update)
      169    0.004    0.000    0.473    0.003 /usr/local/lib/python3.9/site-packages/aiokafka/client.py:460(send)
      104    0.104    0.001    0.466    0.004 /usr/local/lib/python3.9/site-packages/aiokafka/conn.py:374(send)
      208    0.001    0.000    0.294    0.001 /usr/local/lib/python3.9/site-packages/kafka/util.py:155(__call__)
      208    0.004    0.000    0.293    0.001 /usr/local/lib/python3.9/site-packages/kafka/protocol/struct.py:40(_encode_self)
  408/208    0.025    0.000    0.289    0.001 /usr/local/lib/python3.9/site-packages/kafka/protocol/types.py:143(encode)
      107    0.002    0.000    0.270    0.003 /opt/project/tests/contrib/aiokafka/test_aiokafka_performance.py:34(send_many)
  408/208    0.003    0.000    0.237    0.001 /usr/local/lib/python3.9/site-packages/kafka/protocol/types.py:146(<listcomp>)
  202/102    0.002    0.000    0.233    0.002 /usr/local/lib/python3.9/site-packages/kafka/protocol/types.py:181(encode)

So it's spending all it's time in crc32c.py. When I looked into this code I concluded that there is no way to avoid this code from being called on the producer side. (Obviously if you cannot tell by now I am not a Kafka expert and have no idea why this check is necessary).

But it seems possible that the reason this is so slow is that it is using the python version of crc32c and not the native version. That's what I am hoping, at least...

So questions are:

  1. Does this indicate that crc32c is the problem?
  2. How can I make sure that I am using the native crc32c ?
  3. Has anyone seen this with respect to using aiokafka on python:buster-3.9.x before?
0

There are 0 best solutions below