PyFlink - get error trying to read data fron Kafka

46 Views Asked by At

I am trying to read some messages from Kafka with PyFlink:

def read_from_kafka(environment):
    source = (KafkaSource.builder()
              .set_bootstrap_servers('localhost:9094')
              .set_topics("data.orders")
              .set_group_id("test_group_1")
              .set_starting_offsets(KafkaOffsetsInitializer.earliest())
              .set_value_only_deserializer(SimpleStringSchema())
              .build())
    data_stream = (environment
                   .from_source(source, WatermarkStrategy.no_watermarks(), 'KafkaSource')
                   .map(lambda x: x.decode())
                   )
    data_stream.print()
    environment.execute()

The message looks like

{
  "id" : 4,
  "weight" : 41,
  "comment" : "test message"
}

Bur I get an error:

Traceback (most recent call last):
  File "E:\MyProjects\flink_test_project\kafka_json_format.py", line 44, in <module>
    read_from_kafka(env)
  File "E:\MyProjects\flink_test_project\kafka_json_format.py", line 34, in read_from_kafka
    environment.execute()
  File "E:\MyProjects\flink_test_project\venv\lib\site-packages\pyflink\datastream\stream_execution_environment.py", line 773, in execute
    return JobExecutionResult(self._j_stream_execution_environment.execute(j_stream_graph))
  File "E:\MyProjects\flink_test_project\venv\lib\site-packages\py4j\java_gateway.py", line 1322, in __call__
    return_value = get_return_value(
  File "E:\MyProjects\flink_test_project\venv\lib\site-packages\pyflink\util\exceptions.py", line 146, in deco
    return f(*a, **kw)
  File "E:\MyProjects\flink_test_project\venv\lib\site-packages\py4j\protocol.py", line 326, in get_return_value
    raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o0.execute.
: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
    at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
    at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:141)

When I read messages without map() it works. But when I try to modify data with map it breaks down. What is going wrong?

0

There are 0 best solutions below