Camel Elasticsearch Component throws Connection refused

276 Views Asked by At

I'm using latest Camel 3.17 on Camel-K with Elasticsearch REST Component and Elastic 7.7 instance all on Kubernetes Cluster with Services.

I'm getting Connection refused when running a simple integration with this Route:

import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultCamelContext;

import org.apache.camel.component.elasticsearch.ElasticsearchComponent;

public class Routes extends RouteBuilder {
    
    @Override
    public void configure() throws Exception {

        ElasticsearchComponent elasticsearchComponent = new ElasticsearchComponent();
        elasticsearchComponent.setHostAddresses("elasticsearch-rlam-service:9200");
        getContext().addComponent("elasticsearch-rest", elasticsearchComponent);

        from("kafka:dbz?brokers={{kafka.bootstrap.address}}&groupId=apps&autoOffsetReset=earliest")
            .choice()
                .when().simple("${body} == 'null'")
                    .log("Null!")
                .otherwise()
                    .log("Message: ${body}")
                    .to("elasticsearch-rest://elasticsearch?hostAddresses=elasticsearch-rlam-service:9200&operation=INDEX&indexName=dbz")
            .endChoice();
    }
}

The call for ElasticSearchComponent is not mandatory for this test since I'm not connecting through port 9300 with credentials, but still removing it and adding hostAddresses property doesn't resolve this issue.

The stacktrace:

2022-06-20 00:11:35,837 INFO [route1] (Camel (camel-1) thread #1 - KafkaConsumer[dbz]) Null!
2022-06-20 00:11:35,849 INFO [route1] (Camel (camel-1) thread #1 - KafkaConsumer[dbz]) Message: {"last_name":"Ketchmar","id":1004,"first_name":"Anne","email":"[email protected]"}
2022-06-20 00:11:36,116 ERROR [org.apa.cam.pro.err.DefaultErrorHandler] (Camel (camel-1) thread #1 - KafkaConsumer[dbz]) Failed delivery for (MessageId: 274D73D47B2829F-0000000000000000 on ExchangeId: 274D73D47B2829F-0000000000000000). Exhausted after delivery attempt: 1 caught: java.net.ConnectException: Connection refused
Message History (source location and message history is disabled)
---------------------------------------------------------------------------------------------------------------------------------------
Source ID Processor Elapsed (ms)
route1/route1 from[kafka://dbz?autoOffsetReset=earliest&brokers= 272
...
route1/to1 elasticsearch-rest://elasticsearch?hostAddresses=e 0
Stacktrace
---------------------------------------------------------------------------------------------------------------------------------------: java.net.ConnectException: Connection refused
at org.elasticsearch.client.RestClient.extractAndWrapCause(RestClient.java:918)
at org.elasticsearch.client.RestClient.performRequest(RestClient.java:299)
at org.elasticsearch.client.RestClient.performRequest(RestClient.java:287)
at org.elasticsearch.client.RestHighLevelClient.internalPerformRequest(RestHighLevelClient.java:1632)
at org.elasticsearch.client.RestHighLevelClient.performRequest(RestHighLevelClient.java:1602)
at org.elasticsearch.client.RestHighLevelClient.performRequestAndParseEntity(RestHighLevelClient.java:1572)
at org.elasticsearch.client.RestHighLevelClient.index(RestHighLevelClient.java:989)
at org.apache.camel.component.elasticsearch.ElasticsearchProducer.process(ElasticsearchProducer.java:170)
at org.apache.camel.support.AsyncProcessorConverterHelper$ProcessorToAsyncProcessorBridge.process(AsyncProcessorConverterHelper.java:66)
at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:172)
at org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$SimpleTask.run(RedeliveryErrorHandler.java:471)
at org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:193)
at org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleMain(DefaultReactiveExecutor.java:64)
at org.apache.camel.processor.Pipeline.process(Pipeline.java:184)
at org.apache.camel.impl.engine.CamelInternalProcessor.process(CamelInternalProcessor.java:399)
at org.apache.camel.impl.engine.DefaultAsyncProcessorAwaitManager.process(DefaultAsyncProcessorAwaitManager.java:83)
at org.apache.camel.support.AsyncProcessorSupport.process(AsyncProcessorSupport.java:41)
at org.apache.camel.component.kafka.consumer.support.KafkaRecordProcessor.processExchange(KafkaRecordProcessor.java:109)
at org.apache.camel.component.kafka.consumer.support.KafkaRecordProcessorFacade.processRecord(KafkaRecordProcessorFacade.java:120)
at org.apache.camel.component.kafka.consumer.support.KafkaRecordProcessorFacade.processPolledRecords(KafkaRecordProcessorFacade.java:80)
at org.apache.camel.component.kafka.KafkaFetchRecords.startPolling(KafkaFetchRecords.java:280)
at org.apache.camel.component.kafka.KafkaFetchRecords.run(KafkaFetchRecords.java:181)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.net.ConnectException: Connection refused
at java.base/sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at java.base/sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:777)
at org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor.processEvent(DefaultConnectingIOReactor.java:174)
at org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor.processEvents(DefaultConnectingIOReactor.java:148)
at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor.execute(AbstractMultiworkerIOReactor.java:351)
at org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager.execute(PoolingNHttpClientConnectionManager.java:221)
at org.apache.http.impl.nio.client.CloseableHttpAsyncClientBase$1.run(CloseableHttpAsyncClientBase.java:64)
... 1 more

I've tried reaching ES from another pod and it works just fine

0

There are 0 best solutions below