I want to use kafka-connect-hdfs for writing schemaless json records from kafka to hdfs file. If I am using JsonConvertor as key/value convertor then it is not working. But if I am using StringConvertor then it writes the json as escaped string.
For example:
actual json -
{"name":"test"}
data written to hdfs file -
"{\"name\":\"test\"}"
expected output to hdfs file -
{"name":"test"}
Is there any way or alternative I can achieve this or I have to use it with schema only?
Below is the exception I get when I try to use JSONConvertor:
[2017-09-06 14:40:19,344] ERROR Task hdfs-sink-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:148)
org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires "schema" and "payload" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.
at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:308)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:406)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:250)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:180)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Configuration of quickstart-hdfs.properties:
name=hdfs-sink
connector.class=io.confluent.connect.hdfs.HdfsSinkConnector
tasks.max=1
topics=test_hdfs_avro
hdfs.url=hdfs://localhost:9000
flush.size=1
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
Configuration of connect-avro-standalone.properties:
bootstrap.servers=localhost:9092
schemas.enable=false
key.converter.schemas.enable=false
value.converter.schemas.enable=false
When you specify a converter in you connector's configuration properties you need to include all the properties pertaining to this converter, regardless of whether such properties are included in the worker's config too.
In the above example, you'll need to specify both:
in quickstart-hdfs.properties.
FYI, JSON export is coming up in the HDFS Connector soon. Track the related pull request here: https://github.com/confluentinc/kafka-connect-hdfs/pull/196
Update: JsonFormat has been merged to master branch.