I'm new to Gobblin and trying to read JSON Kafka message and convert it to AVRO then store it in HDFS. My current job file is like a blow:
job.name=GobblinKafkaQuickStart
job.group=GobblinKafka
job.description=Gobblin quick start job for Kafka
job.lock.enabled=false
kafka.brokers=localhost:9092
source.class=org.apache.gobblin.source.extractor.extract.kafka.KafkaSimpleSource
extract.namespace=org.apache.gobblin.extract.kafka
converter.classes=org.apache.gobblin.converter.json.JsonStringToJsonIntermediateConverter, org.apache.gobblin.converter.avro.JsonIntermediateToAvroConverter
source.schema=[{"columnName":"name", "dataType":{"type": "string"}}, {"columnName":"city", "dataType":{"type": "string"}}, {"columnName":"age", "dataType":{"type": "integer"}}, {"columnName":"ubdated_at", "dataType":{"type": "string"}}]
writer.builder.class=org.apache.gobblin.writer.AvroDataWriterBuilder
extract.namespace=gobblin.source.extractor.filebased
gobblin.converter.schemaInjector.schema=SCHEMA
# writer.file.path.type=tablename
writer.destination.type=HDFS
writer.output.format=text
data.publisher.type=org.apache.gobblin.publisher.BaseDataPublisher
mr.job.max.mappers=1
metrics.reporting.file.enabled=true
metrics.log.dir=${gobblin.cluster.work.dir}/metrics
metrics.reporting.file.suffix=txt
bootstrap.with.offset=earliest
Example Kafka Massage: {"age": 36, "city": "London", "name": "John", "ubdated_at": "2020-05-19"}
However when I run this on standalone mode, getting bellow error.
ERROR [TaskExecutor-1] org.apache.gobblin.runtime.Task 551 - Task
task_GobblinKafkaQuickStart_1589884160573_0 failed java.lang.IllegalStateException: This is not a JSON
Array. at com.google.gson.JsonElement.getAsJsonArray(JsonElement.java:106) at
org.apache.gobblin.converter.json.JsonStringToJsonIntermediateConverter.convertSchema(JsonStringToJsonIn
termediateConverter.java:71) at
org.apache.gobblin.converter.json.JsonStringToJsonIntermediateConverter.convertSchema(JsonStringToJsonIn
termediateConverter.java:48) at
org.apache.gobblin.instrumented.converter.InstrumentedConverterDecorator.convertSchema(InstrumentedConve
rterDecorator.java:79) at
org.apache.gobblin.runtime.MultiConverter.convertSchema(MultiConverter.java:76) at
org.apache.gobblin.runtime.Task.runSynchronousModel(Task.java:417) at
org.apache.gobblin.runtime.Task.run(Task.java:368) at org.apache.gobblin.runtime.TaskExecutor$TrackingTask.run(TaskExecutor.java:443) at
org.apache.gobblin.util.executors.MDCPropagatingRunnable.run(MDCPropagatingRunnable.java:39) at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at
java.util.concurrent.FutureTask.run(FutureTask.java:266) at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at
java.util.concurrent.FutureTask.run(FutureTask.java:266) at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at
java.util.concurrent.FutureTask.run(FutureTask.java:266) at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at
java.lang.Thread.run(Thread.java:748)
Could some one help me?
Your
gobblin.converter.schemaInjector.schemaproperty just saysSCHEMA, and the library expects actual JSON schema at that place. You can either provide it, or reference the schema you defined above like this: