unexpected exception on pyflink job

26 Views Asked by At

The key part of my pyflink program is as follows:

_sen_ml_parse = SenMLParse()

source = KafkaSource.builder() \
        .set_bootstrap_servers('kafka:9092') \
        .set_topics('ETL') \
        .set_value_only_deserializer(SimpleStringSchema()) \
        .set_starting_offsets(KafkaOffsetsInitializer.latest()) \
        .build()

ds = env.from_source(source, WatermarkStrategy.no_watermarks(), "Kafka Source")

ds = ds.map(lambda i : (eval(i)['MSGID'], eval(i)['PAYLOAD'])) \
       .map(lambda i : _sen_ml_parse.senmlparse(i))
ds.print()

SenMLParse is a module defined in another file. Submission of this job was successful, but the job would failed after a few seconds, and there would be an exception "no module named SenMLParse" , which confused me a lot. I'm wondering the reason of this exception happened. I can come up with two myself:

  1. The module was not loaded properly. But how can it passed the compilation(the submission was successful)?
  2. The object I declared at the first line wasn't constant, so the manipulation on the datasource at the last few lines could not reach it?
0

There are 0 best solutions below