Problems about the memory usage in Flink

72 Views Asked by At

I have a Flink program looks like this:

env = StreamExecutionEnvironment.get_execution_environment()
env.set_runtime_mode(RuntimeExecutionMode.STREAMING)
env.set_parallelism(1)

source1 = KafkaSource.builder() \
        .set_bootstrap_servers('kafka:9092') \
        .set_topics('HPT1') \
        .set_value_only_deserializer(SimpleStringSchema()) \
        .build()
  
ds = env.from_source(source1, WatermarkStrategy.no_watermarks(), "Kafka Source")
ds = ds.map(lambda i : mapping_func(i)) \
       .key_by(lambda i : i[1]) \
       .reduce(lambda i, j : mean_sqrt(i, j))
ds.print()
env.execute()

in which the mapping_func is a simple function that returns a tuple, and the mean_sqrt is a function to calculate the mean square of the data from the datasource. Using reduce might not be the best solution but the results of calculation suit me well.

Never mind these details, my problem is about the memory usage of the program. I notice that, when Flink is up but with no task running, the memory occupied by taskmanager is about 200MB. When I submit a job(the code block above), the memory first grows to about 400MB, then slowly grows to an upper bound, like 650MB, and finally fluctuates around there.

I guess the memory usage growth is caused by allocating JVM heap memory to a task, and then some states and intermediate results are stored on the heap. but why would it reach an upper bound? I didn't use any mechanism like state TTL to clear the states.

By the way, is there a method to limit the memory usage of a Flink process? In my scenario, Flink will be deployed on IoT devices, so I'd like to limit the resource it might take, for example, limiting the memory occupied by taskmanager within 300MB while the job is running. Any way to achieve this?

0

There are 0 best solutions below