aiven version: 0.13.0
kafka version: 3.5.1
this is my connector configuration:
{
"connector.class": "io.aiven.kafka.connect.gcs.GcsSinkConnector",
"tasks.max": "4",
"topics": "loan_logs",
"gcs.credentials.path": "<credspath>",
"gcs.bucket.name": "datalake-raw",
"file.name.prefix": "lsm/",
"file.name.timestamp.timezone": "Asia/Jakarta",
"file.name.template": "{{topic}}/sink_date={{timestamp:unit=yyyy}}{{timestamp:unit=MM}}{{timestamp:unit=dd}}/sink_hour={{timestamp:unit=HH}}/p{{partition:padding=false}}-{{start_offset:padding=true}}.snappy.parquet",
"file.compression.type": "snappy",
"format.output.fields": "key,value,offset,timestamp,headers",
"format.output.type": "parquet",
"format.output.envelope": "true",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"key.converter.schemas.enable": "false",
"value.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter.schemas.enable": "false",
"behavior.on.null.values": "ignore"
}
Some of files written by connector in the designated bucket has 0 bytes, I wonder if this is normal behavior. Then at some point the files will get overwritten every minute by the connector until next hour (the files are divided by sink hour directory) and it keeps happening again. I found out that overwriting can be happened should kafka reprocess the messages, but why kafka need to reprocess them?
I also found this error in connector logs, not sure if they are related with the issue:
2024-02-09 04:06:01,665 ERROR [de-lsm-gcs-connector|task-1] WorkerSinkTask{id=de-lsm-gcs-connector-1} Offset commit failed, rewinding to last committed offsets (org.apache.kafka.connect.runtime.WorkerSinkTask) [task-thread-de-lsm-gcs-connector-1]
org.apache.kafka.connect.errors.ConnectException: java.lang.NullPointerException
at io.aiven.kafka.connect.gcs.GcsSinkTask.flushFile(GcsSinkTask.java:131)
at java.base/java.util.HashMap.forEach(HashMap.java:1421)
at java.base/java.util.Collections$UnmodifiableMap.forEach(Collections.java:1553)
at io.aiven.kafka.connect.gcs.GcsSinkTask.flush(GcsSinkTask.java:114)
at org.apache.kafka.connect.sink.SinkTask.preCommit(SinkTask.java:125)
at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:407)
at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:377)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:221)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:206)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:202)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:257)
at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:177)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: java.lang.NullPointerException
...
2024-02-09 04:06:01,665 ERROR [de-lsm-gcs-connector|task-1] WorkerSinkTask{id=de-lsm-gcs-connector-1} Commit of offsets threw an unexpected exception for sequence number 124: null (org.apache.kafka.connect.runtime.WorkerSinkTask) [task-thread-de-lsm-gcs-connector-1]
org.apache.kafka.connect.errors.ConnectException: java.lang.NullPointerException
at io.aiven.kafka.connect.gcs.GcsSinkTask.flushFile(GcsSinkTask.java:131)
at java.base/java.util.HashMap.forEach(HashMap.java:1421)
at java.base/java.util.Collections$UnmodifiableMap.forEach(Collections.java:1553)
at io.aiven.kafka.connect.gcs.GcsSinkTask.flush(GcsSinkTask.java:114)
at org.apache.kafka.connect.sink.SinkTask.preCommit(SinkTask.java:125)
at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:407)
at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:377)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:221)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:206)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:202)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:257)
at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:177)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: java.lang.NullPointerException
...
I am using the same configuration in different kafka with version 3.3 and aiven 0.9.0 and there has not been any issue similar to this. Is there any new configuration I am missing?
some of the messages has NULL key, is this somehow related?