I'm working on a streaming application with Apache Flink (version 1.17.2) using PyFlink, and I've run into an issue when configuring my Kafka source to resume from the last committed offsets. Additionally, I'm including custom JAR files in my environment setup. When executing my job, I encounter a Py4JError specifically related to initializing Kafka offsets.
Here's the error I'm seeing:
Traceback (most recent call last):
File "/usr/lib/python3.10/runpy.py", line 196, in _run_module_as_main
return _run_code(code, main_globals, None,
...
py4j.protocol.Py4JError: An error occurred while calling z:org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer.committedOffsets. Trace:
org.apache.flink.api.python.shaded.py4j.Py4JException: Method committedOffsets([class org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.OffsetResetStrategy]) does not exist
I'm attempting to use KafkaOffsetsInitializer.committed_offsets() to set the starting offsets for my Kafka source. Along with this, I'm adding the following JAR files to my Flink environment to support Kafka connectivity:
kafka-clients-3.6.0.jar
flink-connector-kafka-1.17.2.jar
flink-sql-connector-kafka-1.17.2.jar
Here's a snippet of my code, including where I add the JAR files:
from pyflink.common.serialization import SimpleStringSchema
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors.kafka import KafkaSource, KafkaOffsetsInitializer
env = StreamExecutionEnvironment.get_execution_environment()
# Adding JAR files
env.add_jars(
"file:///opt/flink/lib/kafka-clients-3.6.0.jar",
"file:///opt/flink/lib/flink-connector-kafka-1.17.2.jar",
"file:///opt/flink/lib/flink-sql-connector-kafka-1.17.2.jar"
)
kafka_source = KafkaSource.builder() \
.set_bootstrap_servers("kafka-server:9092") \
.set_topics("my-topic") \
.set_group_id("my-consumer-group") \
.set_starting_offsets(KafkaOffsetsInitializer.committed_offsets()) \
.set_value_only_deserializer(SimpleStringSchema()) \
.build()
env.add_source(kafka_source)
My concerns are:
Why is the Py4JError occurring, particularly in relation to KafkaOffsetsInitializer.committed_offsets()?
Could the inclusion of these JAR files be affecting how offsets are initialized or how the method is recognized?
Is there something I'm missing in the setup that's causing this method to not be recognized, given my PyFlink and Kafka connector versions?
Any advice or insights into resolving this error would be greatly appreciated. I'm aiming to ensure my Flink job can resume processing from where it left off after a crash, without reprocessing or losing data.
Tried looking into why this problem was occurring, but did not find any documentation that pointed out a mismatch between my jar files