Flink Job not printing anything to StdOut

55 Views Asked by At

I have a Flink Job that I can't understand why it won't print to standard output. I notice that If I remove my filter and watermark, I see raw messages from my kafka topic. But applying an aggregation and a watermark, I don't get anything. Here's my code under main()

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    // // create kafka source

    KafkaSource<VideoAdCompletedEvent> videoAdsSource = KafkaSource
    .builder()
    .setBootstrapServers(jobConfig.kafkaSourceServer)
    .setTopics(jobConfig.sourceKafkaTopic).setGroupId(jobConfig.consumerGroupId)
    .setStartingOffsets(OffsetsInitializer.earliest())
    .setValueOnlyDeserializer(new VideoDeserSchema())
    .build();

    WatermarkStrategy<VideoAdCompletedEvent> watermarkStrategy = WatermarkStrategy
    .<VideoAdCompletedEvent>forBoundedOutOfOrderness(Duration.ofSeconds(30)) // allow events up to 30 seconds out of order
    .withTimestampAssigner(
        (event, timestamp) -> event.getOriginTs()
    )
    .withIdleness(Duration.ofMillis(500));

    DataStream<VideoAdCompletedEvent> videoAdsStream = env
    .fromSource(videoAdsSource, watermarkStrategy, "Mobile Analytics Filtered Video Ads Topic")
    .assignTimestampsAndWatermarks(watermarkStrategy);

    DataStream<VideoAdCompletedEvent> filteredStream = videoAdsStream
    .filter(new FilterFunction<VideoAdCompletedEvent>() {
        @Override
        public boolean filter(VideoAdCompletedEvent message) throws Exception {
          return (message.getCompletedReason().equals("VIDEO_FINISHED") || message.getCompletedReason().equals("video_finished"));
        }
    });

    DataStream<Tuple2<String, Integer>>  aggStream = filteredStream
        .keyBy((VideoAdCompletedEvent event) -> event.getVideoId())
        .window(TumblingEventTimeWindows.of(Time.seconds(30)))
        .aggregate(new DistinctUserIdAggregate(), new VideoIdCountProcessFunction());

    SinkFunction<VideoAdCompletedEvent> printSink = new PrintSinkFunction();

    aggStream.addSink(printSink);

    env.execute("Test Job");

Any ideas?

1

There are 1 best solutions below

0
kkrugler On

A few possible explanations...

  1. Your FilterFunction is removing all events.
  2. Your VideoIdCountProcessFunction isn't generating any results.
  3. All events have watermarks within the same 60 second time interval, thus you never generate a watermark that causes a window to fire.

Adding additional logging and/or counters would help figure out where events are being removed.

Note that the call to .assignTimestampsAndWatermarks() should be removed, as you're already passing the watermark strategy to the Kafka source via the .fromSource() call.