Pulsar with source connector Rabbitmq error

225 Views Asked by At

I am trying to use as source the RabbitMQ for Apache Pulsar. I run locally the binaries of Pulsar and I use the offical RabbitMQ connector. The RabbitMQ is running in server. For few moments can I see RabbitMQ dashboard that Pulsar is connect but the I get an error in Pulsar,

2022-06-15T19:19:14,949+0300 [function-timer-thread-78-1] ERROR org.apache.pulsar.functions.runtime.process.ProcessRuntime - Health check failed for rabbit-connector-local-0
java.util.concurrent.ExecutionException: io.grpc.StatusRuntimeException: UNAVAILABLE: io exception
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395) ~[?:?]
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999) ~[?:?]
at org.apache.pulsar.functions.runtime.process.ProcessRuntime.lambda$start$1(ProcessRuntime.java:184) ~[org.apache.pulsar-pulsar-functions-runtime-2.10.0.jar:2.10.0]
at org.apache.pulsar.common.util.Runnables$CatchingAndLoggingRunnable.run(Runnables.java:54) [org.apache.pulsar-pulsar-common-2.10.0.jar:2.10.0]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305) [?:?]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) [?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [io.netty-netty-common-4.1.74.Final.jar:4.1.74.Final]
at java.lang.Thread.run(Thread.java:829) [?:?]
Caused by: io.grpc.StatusRuntimeException: UNAVAILABLE: io exception
at io.grpc.Status.asRuntimeException(Status.java:535) ~[io.grpc-grpc-api-1.42.1.jar:1.42.1]
at io.grpc.stub.ClientCalls$UnaryStreamToFuture.onClose(ClientCalls.java:534) ~[io.grpc-grpc-stub-1.42.1.jar:1.42.1]
at io.grpc.internal.DelayedClientCall$DelayedListener$3.run(DelayedClientCall.java:463) ~[io.grpc-grpc-core-1.42.1.jar:1.42.1]
at io.grpc.internal.DelayedClientCall$DelayedListener.delayOrExecute(DelayedClientCall.java:427) ~[io.grpc-grpc-core-1.42.1.jar:1.42.1]
at io.grpc.internal.DelayedClientCall$DelayedListener.onClose(DelayedClientCall.java:460) ~[io.grpc-grpc-core-1.42.1.jar:1.42.1]
at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:562) ~[io.grpc-grpc-core-1.42.1.jar:1.42.1]
at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:70) ~[io.grpc-grpc-core-1.42.1.jar:1.42.1]
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:743) ~[io.grpc-grpc-core-1.42.1.jar:1.42.1]
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:722) ~[io.grpc-grpc-core-1.42.1.jar:1.42.1]
at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) ~[io.grpc-grpc-core-1.42.1.jar:1.42.1]
at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133) ~[io.grpc-grpc-core-1.42.1.jar:1.42.1]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[?:?]
... 1 more
Caused by: io.grpc.netty.shaded.io.netty.channel.AbstractChannel$AnnotatedConnectException: finishConnect(..) failed: Connection refused: /127.0.0.1:46247
Caused by: java.net.ConnectException: finishConnect(..) failed: Connection refused
at io.grpc.netty.shaded.io.netty.channel.unix.Errors.newConnectException0(Errors.java:155) ~[io.grpc-grpc-netty-shaded-1.42.1.jar:1.42.1]
at io.grpc.netty.shaded.io.netty.channel.unix.Errors.handleConnectErrno(Errors.java:128) ~[io.grpc-grpc-netty-shaded-1.42.1.jar:1.42.1]
at io.grpc.netty.shaded.io.netty.channel.unix.Socket.finishConnect(Socket.java:278) ~[io.grpc-grpc-netty-shaded-1.42.1.jar:1.42.1]
at io.grpc.netty.shaded.io.netty.channel.epoll.AbstractEpollChannel$AbstractEpollUnsafe.doFinishConnect(AbstractEpollChannel.java:710) ~[io.grpc-grpc-netty-shaded-1.42.1.jar:1.42.1]
at io.grpc.netty.shaded.io.netty.channel.epoll.AbstractEpollChannel$AbstractEpollUnsafe.finishConnect(AbstractEpollChannel.java:687) ~[io.grpc-grpc-netty-shaded-1.42.1.jar:1.42.1]
at io.grpc.netty.shaded.io.netty.channel.epoll.AbstractEpollChannel$AbstractEpollUnsafe.epollOutReady(AbstractEpollChannel.java:567) ~[io.grpc-grpc-netty-shaded-1.42.1.jar:1.42.1]
at io.grpc.netty.shaded.io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:470) ~[io.grpc-grpc-netty-shaded-1.42.1.jar:1.42.1]
at io.grpc.netty.shaded.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378) ~[io.grpc-grpc-netty-shaded-1.42.1.jar:1.42.1]
at io.grpc.netty.shaded.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) ~[io.grpc-grpc-netty-shaded-1.42.1.jar:1.42.1]
at io.grpc.netty.shaded.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[io.grpc-grpc-netty-shaded-1.42.1.jar:1.42.1]
at io.grpc.netty.shaded.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[io.grpc-grpc-netty-shaded-1.42.1.jar:1.42.1]
... 1 more

My yaml file for rabbitmq is,

configs:                                                                       
    host: "my.server"
    port: 5672
    virtualHost: "/" 
    username: "user"
    password: "pass"
    queueName: "topic"
    connectionName: "my-connection"
    requestedChannelMax: 0

Any ideas? Need to do something additional to my rabbitmq maybe?

1

There are 1 best solutions below

0
On

I have seen that happen before sometimes with disk errors, sometimes with the wrong Java version. Make sure you are using the correct Java version for your version of Pulsar. Which version of Pulsar are you using? If it's the current github use JDK 17, otherwise probably JDK 8 or 11 are your best choice.

Is Pulsar running?

My config file

messaging protocol you need to comma delimit if you have multi

messagingProtocols=amqp

directory

protocolHandlerDirectory=./protocols

--- AMQP

add amqp configs

amqpListeners=amqp://127.0.0.1:5672 amqpTenant=public

See here: https://github.com/tspannhw/FLiPN-AirQuality-Checks