Closing Spark Streaming Context after first batch (trying to retrieve kafka offsets)

844 Views Asked by At

I am trying to retrieve Kafka offsets for my Spark Batch job. After retrieving the offsets, I would like to close the stream context.

I tried adding a streamlistener to the stream context, and implementing the onBatchCompleted method to close the stream once the job is complete but I am receiving the exception "Cannot stop StreamingContext within listener bus thread".

Is there a solution to this? I am trying to retrieve the offsets to call KafkaUtils.createRDD(sparkContext, kafkaProperties, OffsetRange[], LocationStrateg)

private OffsetRange[] getOffsets(SparkConf sparkConf) throws InterruptedException {
    final AtomicReference<OffsetRange[]> atomicReference = new AtomicReference<>();

    JavaStreamingContext sc = new JavaStreamingContext(sparkConf, Duration.apply(50));
    JavaInputDStream<ConsumerRecord<String, String>> stream =
            KafkaUtils.createDirectStream(sc, LocationStrategies.PreferConsistent(), ConsumerStrategies.<String, String>Subscribe(Arrays.asList("test"), getKafkaParam()));
    stream.foreachRDD((VoidFunction<JavaRDD<ConsumerRecord<String, String>>>) rdd -> {
                atomicReference.set(((HasOffsetRanges) rdd.rdd()).offsetRanges());
                // sc.stop(false); //this would throw exception saying consumer is already closed
            }
    );
    sc.addStreamingListener(new TopicListener(sc)); //Throws exception saying "Cannot stop StreamingContext within listener bus thread."
    sc.start();
    sc.awaitTermination();
    return atomicReference.get();
}



public class TopicListener implements StreamingListener {
private JavaStreamingContext sc;

public TopicListener(JavaStreamingContext sc){
    this.sc = sc;
}
@Override
public void onBatchCompleted(StreamingListenerBatchCompleted streamingListenerBatchCompleted) {
    sc.stop(false);
}

Many thanks stackoverflow-ers :) I have tried searching possible solutions but not have so far been successful

Edit: I used the KafkaConsumer to get the partitions info. Once I get the partitions info, I create a list of TopicPartition pojos and call the position and endOffsets methods to get the current position of my groupId and the end position respectively.

final List<PartitionInfo> partitionInfos = kafkaConsumer.partitionsFor("theTopicName");
final List<TopicPartition> topicPartitions = new ArrayList<>();
partitionInfos.forEach(partitionInfo -> topicPartitions.add(new TopicPartition("theTopicName", partitionInfo.partition())));
final List<OffsetRange> offsetRanges = new ArrayList<>();
kafkaConsumer.assign(topicPartitions);
topicPartitions.foreach(topicPartition -> {
    long fromOffset = kafkaConsumer.position(topicPartition);
    kafkaConsumer.seekToEnd(Collections.singleton(topicPartition));
    long untilOffset = kafkaConsumer.position(topicPartition);
    offsetRanges.add(new OffsetRange(topicPartition.topic(), topicPartition.partition(), fromOffset, untilOffset));
});
return offsetRanges.toArray(new OffsetRange[offsetRanges.size()]);
1

There are 1 best solutions below

0
On

If you want to control the flow you may consider using polling instead of streaming api. That way you can clearly stop the polling once your objectives are met.

Also check this out ...

https://github.com/dibbhatt/kafka-spark-consumer