Client is not connected to any Elasticsearch nodes in Flink

I am using Flink 1.1.2 and have added ElesticSearch dependency in Maven as follows


My program contains the following code that is reading data from Kafka and inserting to Elastic search

public class ReadFromKafka {

    public static void main(String[] args) throws Exception {
        // create execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");
        properties.setProperty("zookeeper.connect", "localhost:2181");
        properties.setProperty("", "test");

        DataStream<JoinedStreamEvent> message = env.addSource(new FlinkKafkaConsumer09<JoinedStreamEvent>("test",
                new JoinSchema(), properties));

        System.out.println("reading form kafka ");


        Map<String, String> config = new HashMap<>();
        config.put("bulk.flush.max.actions", "1");   // flush inserts after every event
        config.put("", "elasticsearch_amar"); // default cluster name

        List<InetSocketAddress> transports = new ArrayList<>();
// set default connection details
        transports.add(new InetSocketAddress(InetAddress.getByName(""), 9300));

       message.addSink(new ElasticsearchSink<>(config,transports,new ElasticInserter()));


    } //main

    public static class ElasticInserter implements ElasticsearchSinkFunction<JoinedStreamEvent>{

        public void process(JoinedStreamEvent record, RuntimeContext runtimeContext, RequestIndexer requestIndexer) {

            Map<String, Integer> json = new HashMap<>();

            json.put("Time", record.getPatient_id());
            json.put("heart Rate ", record.getHeartRate());
            json.put("resp rete", record.getRespirationRate());

            IndexRequest rqst = Requests.indexRequest()
                    .index("nyc-places")           // index name
                    .type("popular-locations")     // mapping name


        } //process

    } //ElasticInserter

} //ReadFromKafka

I have installed ElesticSearch using homebrew and then started it using elesticsearch command as shown below

There are 1 best solutions below


I have a bit of suggestion:

  • first check whether ES is up, see Can't Connect to Elasticsearch (through Curl).
  • recommended to use the docker container to start ES, eg. docker run -d --name es -p 9200:9200 elasticsearch:2
  • BTW, You can try: modify value to in ES config elasticsearch.yml: