Flink's map function seems to run in batch mode

103 Views Asked by At

I'm new in Flink (with python), recently I met a problem, in short I believe(and actually I have verified this) the map function runs in batch mode even though I set the environment in streaming mode. for example, if I write my codes as follow:

env = StreamExecutionEnvironment.get_execution_environment()
env.set_runtime_mode(RuntimeExecutionMode.STREAMING)
env.set_parallelism(1)

source = KafkaSource.builder() \
        .set_bootstrap_servers('kafka:9092') \
        .set_topics('topic') \
        .set_value_only_deserializer(SimpleStringSchema()) \
        .build()  

ds = env.from_source(source, WatermarkStrategy.no_watermarks(), "Kafka Source")
ds = ds.map(lambda i : i)
ds.print()
env.execute()

and my codes to send messages to kafka topic looks like below(just for example),which sends a record to the kafka topic every 0.5 sec:

producer = KafkaProducer(bootstrap_servers=['kafka:9092'],
                value_serializer=lambda x: dumps(x).encode('utf-8'))
while True:
    cul_data = {...}
    producer.send(topic, value=cur_data)
    sleep(0.5)

Then I found the result to be out of my expectation. I'd expect that each record be processed by map and other functions(for example, key_by, reduce and so on) as soon as they're polled by the source. But in fact, when the very first record arrives, it will not be processed immediately, instead, it will wait for a while until a number of records form a batch, and then the whole batch will be send to map function to be processed. It confuses me a lot.

After testing, I believe there's a constant interval to generate a batch, that is 2 seconds. If I run the first block of codes, 4 records will be printed to the stdout every 2 seconds, and the timestamps these records were processed by map are almost the same. If I change the interval to send records to the kafka topic to 0.1 seconds, there will be 20 records printed every 2 seconds, timestamps the same. If I delete this line from the first block:

ds = ds.map(lambda i : i)

Which means that the map function would not be executed, each record will be printed to stdout as soon as they're sent to the topic and polled by the source. So I believe the problem lies on the map function. Can anybody explain this to me? Really thanks a lot, it's costing me too much time.

1

There are 1 best solutions below

2
On

Well, I think I've found the solution. It's about Flink's Configuration settings. As is default, Flink sets the number of records of a batch as 1000 with the configuration option python.fn-execution.bundle.size. As I configured this to 1, my program processes each record as soon as they arrive. To be honest I still haven't found the specified configuration option matched to the 2 seconds interval I mentioned above, but I think my problem is well fixed.