I have an MQ feed that I need to ingest into multiple kafka clusters. To do this, I ingest into one, use the S3 Sink connector to sink to S3, and I'm trying to use the S3 Source Connector to reingest into the second however I just get this output once a minute, it doesn't commit any messages nor does it give me any errors. I'm running in standalone mode.
INFO [s3-source|worker] PartitionCheckingTask - Checking if Partitions have changed. (io.confluent.connect.cloud.storage.source.util.PartitionCheckingTask:38)
When I end the process, I get the following error:
org.apache.kafka.connect.errors.ConnectException: Unrecoverable exception trying to send
at org.apache.kafka.connect.runtime.WorkerSourceTask.producerSendFailed(WorkerSourceTask.java:159)
at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.sendRecords(AbstractWorkerSourceTask.java:467)
at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.execute(AbstractWorkerSourceTask.java:367)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:229)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:284)
at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:78)
at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:181)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
at java.base/java.lang.Thread.run(Thread.java:1583)
Caused by: org.apache.kafka.common.KafkaException: Producer closed while send in progress
at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:1000)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:963)
at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.sendRecords(AbstractWorkerSourceTask.java:423)
... 10 more
Caused by: org.apache.kafka.common.KafkaException: Requested metadata update after close
at org.apache.kafka.clients.producer.internals.ProducerMetadata.awaitUpdate(ProducerMetadata.java:127)
at org.apache.kafka.clients.producer.KafkaProducer.waitOnMetadata(KafkaProducer.java:1148)
at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:997)
... 12 more
Which sounds like maybe I interrupted the send however there are only five files in my test directory and I left it running for roughly half an hour. Does anyone know what my issue could be?
Here is my connector config:
name=s3-source
tasks.max=1
connector.class=io.confluent.connect.s3.source.S3SourceConnector
s3.bucket.name=my-bucket
s3.region=eu-west-1
format.class=io.confluent.connect.s3.format.bytearray.ByteArrayFormat
topic.regex.list=test.topic:.*
topics.dir=topics
flush.size=1
task.batch.size=1
record.batch.max.size=5
confluent.license=
confluent.topic.bootstrap.servers=kafka-address:9096
confluent.topic.replication.factor=1
Thanks!