I have a requirement where I have to read from multiple Kafka clusters (more than 20 clusters) via spark streaming.
I am able to read all of them by basically creating kafka Direct Stream for all the kafka cluster and performing union on streams.
JavaDStream kafkaStream = null;
for (String kafkaPod : nextgenKafkaPods) {
JavaInputDStream kStream = null;
try {
kStream = createKafkaStream(appContext.getJavaStreamingContext(), kafkaParamsList.get(kafkaPod));
if (kafkaStream == null) {
kafkaStream = kStream;
} else {
kafkaStream = kafkaStream.union(kStream);
}
} catch (Exception e) {
LOGGER.error("Unable to create dStream for pod:" + kafkaPod, e);
}
}
It works fine until/unless one of the stream gets some failure (error can come in between or at start only like ssl handshake error). In such scenario, even if only 1 or few kafka cluster are having issue still whole spark job goes down. How to prevent failure in such scenarios?