I am trying to read some messages from Kafka with PyFlink:
def read_from_kafka(environment):
source = (KafkaSource.builder()
.set_bootstrap_servers('localhost:9094')
.set_topics("data.orders")
.set_group_id("test_group_1")
.set_starting_offsets(KafkaOffsetsInitializer.earliest())
.set_value_only_deserializer(SimpleStringSchema())
.build())
data_stream = (environment
.from_source(source, WatermarkStrategy.no_watermarks(), 'KafkaSource')
.map(lambda x: x.decode())
)
data_stream.print()
environment.execute()
The message looks like
{
"id" : 4,
"weight" : 41,
"comment" : "test message"
}
Bur I get an error:
Traceback (most recent call last):
File "E:\MyProjects\flink_test_project\kafka_json_format.py", line 44, in <module>
read_from_kafka(env)
File "E:\MyProjects\flink_test_project\kafka_json_format.py", line 34, in read_from_kafka
environment.execute()
File "E:\MyProjects\flink_test_project\venv\lib\site-packages\pyflink\datastream\stream_execution_environment.py", line 773, in execute
return JobExecutionResult(self._j_stream_execution_environment.execute(j_stream_graph))
File "E:\MyProjects\flink_test_project\venv\lib\site-packages\py4j\java_gateway.py", line 1322, in __call__
return_value = get_return_value(
File "E:\MyProjects\flink_test_project\venv\lib\site-packages\pyflink\util\exceptions.py", line 146, in deco
return f(*a, **kw)
File "E:\MyProjects\flink_test_project\venv\lib\site-packages\py4j\protocol.py", line 326, in get_return_value
raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o0.execute.
: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:141)
When I read messages without map()
it works. But when I try to modify data with map
it breaks down. What is going wrong?