We need to implement a rack awareness feature for Kafka using storm spout. For this, we need to set property cleint.rack with availability zone as value from the node where this topology is running. But it is giving me the availability zone of the master nimbus node. How can I get the actual host availability zone value for the node on which the spout is running for this topology?
In short, how can I provide the availability zone of the host node where the spout is running during the configuration of KafkaSpout before the stormCluster.submit call.
KafkaSpoutConfig<String, String> userCardIndexKafkaSpoutConfig = KafkaSpoutConfig.builder( userCardIndexSpoutInputKafkaBroker, userCardIndexSpoutInputKafkaTopic) .setProp("group.id", userCardIndexSpoutInputKafkaConsumerGroup) .setProp("key.deserializer", StringDeserializer.class) .setProp("value.deserializer", StringDeserializer.class) .setProp("client.rack",availabilityZone) .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_LEAST_ONCE) .setFirstPollOffsetStrategy(FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST) .setMaxUncommittedOffsets(1) .setProp(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500) .setOffsetCommitPeriodMs(200) .build(); KafkaSpout<String, String> userCardIndexKafkaSpout = new KafkaSpout<>(userCardIndexKafkaSpoutConfig);