I want to use kafka connect solace source connector with avro converter to push messages to kafka topic. Once messages published on kafka topic, I want to use kafka connect jdbc sink connector with avro converter to push the messages from kafka topic to oracle database. I am able push message from Solace to kafka topic but when I run the sink connector part, it is giving an error -> "org.apache.kafka.connect.errors.ConnectException: Value schema must be of type Struct".
Below is the config I have:
Solace Source Connector Properties:
name=solaceSourceConnector
connector.class=com.solace.connector.kafka.connect.source.SolaceSourceConnector
tasks.max=1
key.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schema.registry.url=http://localhost:8081
key.converter.enhanced.avro.schema.support=true
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
value.converter.enhanced.avro.schema.support=true
key.converter.schemas.enable=false
value.converter.schemas.enable=false
kafka.topic=testtopic
#Remaining properties are solace connection related
.
JDBC Sink Connector Properties:
name=test-oracle
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
connection.user = orcl
connection.url = jdbc:oracle:thin:@localhost:1521/TESTSERVICE
connection.password = ****
topics=testtopic
auto.create=false
table.name.format=TEST_TABLE1
key.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schema.registry.url=http://localhost:8081
key.converter.enhanced.avro.schema.support=true
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
value.converter.enhanced.avro.schema.support=true
key.converter.schemas.enable=false
value.converter.schemas.enable=false
After running solace source connector. I can see the messages being pushed in kafka topic. But after running the JDBC Sink Connector I am getting below error:
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:560)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.ConnectException: Value schema must be of type Struct
at io.confluent.connect.jdbc.sink.metadata.FieldsMetadata.extract(FieldsMetadata.java:82)
at io.confluent.connect.jdbc.sink.metadata.FieldsMetadata.extract(FieldsMetadata.java:63)
at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:78)
at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:66)
at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:74)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:538)
Kindly help me in understanding where have I gone wrong. Also what can be done to fix the same.
I think something is happening with your setup or either with your kafka, avro versions. So the sink cannot understand the schema.
Try this setup with Apicurio both to source and sink