Converting YAML to Java Object

1.1k Views Asked by At

I'm currently working with the Zeebe workflow execution engine, and am trying to properly upgrade one of the exporters from using a TOML configuration file to a YAML file. Here is the original TOML:

[[exporters]]
id = "kafka"
className = "io.zeebe.exporters.kafka.KafkaExporter"

# Top level exporter arguments
[exporters.args]
# Controls how many records can have been sent to the Kafka broker without
# any acknowledgment Once the limit is reached the exporter will block and
# wait until either one record is acknowledged
maxInFlightRecords = 1000
# How often should the exporter drain the in flight records' queue of completed
# requests and update the broker with the guaranteed latest exported position
inFlightRecordCheckIntervalMs = 1000

# Producer specific configuration
[exporters.args.producer]
# The list of initial Kafka broker contact points. The format should be the same
# one as the ProducerConfig expects, i.e. "host:port"
# Maps to ProducerConfig.BOOTSTRAP_SERVERS_CONFIG
servers = [ "kafka:9092" ]
# Controls how long the producer will wait for a request to be acknowledged by
# the Kafka broker before retrying it
# Maps to ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG
requestTimeoutMs = 5000
# Grace period when shutting down the producer in milliseconds
closeTimeoutMs = 5000
# Producer client identifier
clientId = "zeebe"
# Max concurrent requests to the Kafka broker; note that in flight records are batched such that
# in one request you can easily have a thousand records, depending on the producer's batch
# configuration.
maxConcurrentRequests = 3

# Any setting under the following section will be passed verbatim to
# ProducerConfig; you can use this to configure authentication, compression,
# etc. Note that you can overwrite some important settings, so avoid changing
# idempotency, delivery timeout, and retries, unless you know what you're doing
[exporters.args.producer.config]

# Controls which records are pushed to Kafka and to which topic
# Each entry is a sub-map which can contain two entries:
#     type => [string]
#     topic => string
#
# Topic is the topic to which the record with the given value type
# should be sent to, e.g. for a deployment record below we would
# send the record to "zeebe-deployment" topic.
#
# Type is a list of accepted record types, allowing you to filter
# if you want nothing ([]), commands (["command"]), events (["events"]),
# or rejections (["rejection"]), or a combination of the three, e.g.
# ["command", "event"].
[exporters.args.records]
# If a record value type is omitted in your configuration file,
# it will fall back to whatever is configured in the defaults
defaults = { type = [ "event" ], topic = "zeebe" }
# For records with a value of type DEPLOYMENT
deployment = { topic = "zeebe-deployment" }
# For records with a value of type INCIDENT
incident = { topic = "zeebe-incident" }
# For records with a value of type JOB_BATCH
jobBatch = { topic = "zeebe-job-batch" }
# For records with a value of type JOB
job = { topic = "zeebe-job" }
# For records with a value of type MESSAGE
message = { topic = "zeebe-message" }
# For records with a value of type MESSAGE_SUBSCRIPTION
messageSubscription = { topic = "zeebe-message-subscription" }
# For records with a value of type MESSAGE_START_EVENT_SUBSCRIPTION
messageStartEventSubscription = { topic = "zeebe-message-subscription-start-event" }
# For records with a value of type RAFT
raft = { topic = "zeebe-raft" }
# For records with a value of type TIMER
timer = { topic = "zeebe-timer" }
# For records with a value of type VARIABLE
variable = { topic = "zeebe-variable" }
# For records with a value of type WORKFLOW_INSTANCE
workflowInstance = { topic = "zeebe-workflow" }
# For records with a value of type WORKFLOW_INSTANCE_SUBSCRIPTION
workflowInstanceSubscription = { topic = "zeebe-workflow-subscription" }

This is, currently, how I have it translated to YAML:

- id: kafka
  className: io.zeebe.exporters.kafka.KafkaExporter
  args:
    maxInFlightRecords: 1000
    inFlightRecordCheckIntervalMs: 1000
    producer:
      servers:
        - 'localhost:9092'
      requestTimeoutMs: 5000
      closeTimeoutMs: 5000
      clientId: zeebe
      maxConcurrentRequests: 3
      config: {}
    records:
      defaults:
        type:
          - event
        topic: zeebe
      deployment:
        topic: zeebe-deployment
      incident:
        topic: zeebe-incident
      jobBatch:
        topic: zeebe-job-batch
      job:
        topic: zeebe-job
      message:
        topic: zeebe-message
      messageSubscription:
        topic: zeebe-message-subscription
      messageStartEventSubscription:
        topic: zeebe-message-subscription-start-event
      raft:
        topic: zeebe-raft
      timer:
        topic: zeebe-timer
      variable:
        topic: zeebe-variable
      workflowInstance:
        type:
          - event
        topic: zeebe-workflow
      workflowInstanceSubscription:
        topic: zeebe-workflow-subscription

I'm currently having an issue with the exporter during this method:

@Override
public void configure(Context context) {
  this.logger = context.getLogger();
  this.id = context.getConfiguration().getId();

  final TomlConfig tomlConfig = context.getConfiguration().instantiate(TomlConfig.class);
  this.config = this.configParser.parse(tomlConfig);
  this.recordHandler = new RecordHandler(this.config.getRecords());

  context.setFilter(new KafkaRecordFilter(this.config.getRecords())); // <-- This line specifically
  this.logger.debug("Configured exporter {}", this.id);
}

When I look at how the broker parses the YAML, it shows this JSON Object:

"1" : {
  "jarPath" : null,
  "className" : "io.zeebe.exporters.kafka.KafkaExporter",
  "args" : {
    "maxInFlightRecords" : 1000,
    "inFlightRecordCheckIntervalMs" : 1000,
    "producer" : {
      "servers" : {
        "0" : "localhost:9092"
      },
      "requestTimeoutMs" : 5000,
      "closeTimeoutMs" : 5000,
      "clientId" : "zeebe",
      "maxConcurrentRequests" : 3
    },
    "records" : {
      "defaults" : {
        "type" : {
          "0" : "event"
        },
        "topic" : "zeebe"
      },
      "deployment" : {
        "topic" : "zeebe-deployment"
      },
      "incident" : {
        "topic" : "zeebe-incident"
      },
      "jobBatch" : {
        "topic" : "zeebe-job-batch"
      },
      "job" : {
        "topic" : "zeebe-job"
      },
      "message" : {
        "topic" : "zeebe-message"
      },
      "messageSubscription" : {
        "topic" : "zeebe-message-subscription"
      },
      "messageStartEventSubscription" : {
        "topic" : "zeebe-message-subscription-start-event"
      },
      "raft" : {
        "topic" : "zeebe-raft"
      },
      "timer" : {
        "topic" : "zeebe-timer"
      },
      "variable" : {
        "topic" : "zeebe-variable"
      },
      "workflowInstance" : {
        "type" : {
          "0" : "event"
        },
        "topic" : "zeebe-workflow"
      },
      "workflowInstanceSubscription" : {
        "topic" : "zeebe-workflow-subscription"
      }
    }
  },

Specifically this part:

"producer" : {
  "servers" : {
    "0" : "localhost:9092"
  },

Should be:

"producer" : {
      "servers" : [ "localhost:32801" ]
    },

I'm also getting this error from the main broker which calls this exporter:

Caused by: java.lang.IllegalStateException: Expected BEGIN_ARRAY but was BEGIN_OBJECT at path $.producer.servers
at com.google.gson.internal.bind.JsonTreeReader.expect(JsonTreeReader.java:163) ~[gson-2.8.6.jar:?]
at com.google.gson.internal.bind.JsonTreeReader.beginArray(JsonTreeReader.java:72) ~[gson-2.8.6.jar:?]
at com.google.gson.internal.bind.CollectionTypeAdapterFactory$Adapter.read(CollectionTypeAdapterFactory.java:80) ~[gson-2.8.6.jar:?]
at com.google.gson.internal.bind.CollectionTypeAdapterFactory$Adapter.read(CollectionTypeAdapterFactory.java:61) ~[gson-2.8.6.jar:?]
at com.google.gson.internal.bind.ReflectiveTypeAdapterFactory$1.read(ReflectiveTypeAdapterFactory.java:131) ~[gson-2.8.6.jar:?]
at com.google.gson.internal.bind.ReflectiveTypeAdapterFactory$Adapter.read(ReflectiveTypeAdapterFactory.java:222) ~[gson-2.8.6.jar:?]
at com.google.gson.internal.bind.ReflectiveTypeAdapterFactory$1.read(ReflectiveTypeAdapterFactory.java:131) ~[gson-2.8.6.jar:?]
at com.google.gson.internal.bind.ReflectiveTypeAdapterFactory$Adapter.read(ReflectiveTypeAdapterFactory.java:222) ~[gson-2.8.6.jar:?]
at com.google.gson.Gson.fromJson(Gson.java:932) ~[gson-2.8.6.jar:?]
at com.google.gson.Gson.fromJson(Gson.java:1003) ~[gson-2.8.6.jar:?]
at com.google.gson.Gson.fromJson(Gson.java:975) ~[gson-2.8.6.jar:?]
at io.zeebe.broker.exporter.context.ExporterConfiguration.instantiate(ExporterConfiguration.java:43) ~[zeebe-broker-0.23.1.jar:0.23.1]
at io.zeebe.exporters.kafka.KafkaExporter.configure(KafkaExporter.java:81) ~[zeebe-kafka-exporter-1.2.0-SNAPSHOT-uber.jar:1.2.0-SNAPSHOT]
at io.zeebe.broker.exporter.repo.ExporterRepository.validate(ExporterRepository.java:91) ~[zeebe-broker-0.23.1.jar:0.23.1]
at io.zeebe.broker.exporter.repo.ExporterRepository.load(ExporterRepository.java:54) ~[zeebe-broker-0.23.1.jar:0.23.1]
at io.zeebe.broker.exporter.repo.ExporterRepository.load(ExporterRepository.java:84) ~[zeebe-broker-0.23.1.jar:0.23.1]
at io.zeebe.broker.system.partitions.ZeebePartition.<init>(ZeebePartition.java:138) ~[zeebe-broker-0.23.1.jar:0.23.1]
at io.zeebe.broker.Broker.lambda$partitionsStep$17(Broker.java:339) ~[zeebe-broker-0.23.1.jar:0.23.1]
at io.zeebe.broker.bootstrap.StartProcess.lambda$startStepByStep$2(StartProcess.java:60) ~[zeebe-broker-0.23.1.jar:0.23.1]
at io.zeebe.broker.bootstrap.StartProcess.takeDuration(StartProcess.java:88) ~[zeebe-broker-0.23.1.jar:0.23.1]
at io.zeebe.broker.bootstrap.StartProcess.startStepByStep(StartProcess.java:58) ~[zeebe-broker-0.23.1.jar:0.23.1]
at io.zeebe.broker.bootstrap.StartProcess.takeDuration(StartProcess.java:88) ~[zeebe-broker-0.23.1.jar:0.23.1]
at io.zeebe.broker.bootstrap.StartProcess.start(StartProcess.java:43) ~[zeebe-broker-0.23.1.jar:0.23.1]
at io.zeebe.broker.Broker.partitionsStep(Broker.java:346) ~[zeebe-broker-0.23.1.jar:0.23.1]
at io.zeebe.broker.Broker.lambda$initStart$9(Broker.java:183) ~[zeebe-broker-0.23.1.jar:0.23.1]
at io.zeebe.broker.bootstrap.StartProcess.lambda$startStepByStep$2(StartProcess.java:60) ~[zeebe-broker-0.23.1.jar:0.23.1]
at io.zeebe.broker.bootstrap.StartProcess.takeDuration(StartProcess.java:88) ~[zeebe-broker-0.23.1.jar:0.23.1]
at io.zeebe.broker.bootstrap.StartProcess.startStepByStep(StartProcess.java:58) ~[zeebe-broker-0.23.1.jar:0.23.1]
at io.zeebe.broker.bootstrap.StartProcess.takeDuration(StartProcess.java:88) ~[zeebe-broker-0.23.1.jar:0.23.1]
at io.zeebe.broker.bootstrap.StartProcess.start(StartProcess.java:43) ~[zeebe-broker-0.23.1.jar:0.23.1]
at io.zeebe.broker.Broker.internalStart(Broker.java:135) ~[zeebe-broker-0.23.1.jar:0.23.1]
at io.zeebe.util.LogUtil.doWithMDC(LogUtil.java:21) ~[zeebe-util-0.23.1.jar:0.23.1]
at io.zeebe.broker.Broker.start(Broker.java:115) ~[zeebe-broker-0.23.1.jar:0.23.1]
at io.zeebe.broker.StandaloneBroker.run(StandaloneBroker.java:59) ~[zeebe-distribution-0.23.1.jar:0.23.1]
at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:784) ~[spring-boot-2.2.5.RELEASE.jar:2.2.5.RELEASE]
... 5 more

So basically, what I currently wanna know is, is this an issue with the YAML formatting, or with the code itself? I ran through the unit tests for the exporter and even upgraded the dependencies to the current version (which caused the breaking change) but the tests all pass fine. The code base for the exporter is here: https://github.com/zeebe-io/zeebe-kafka-exporter and the broker is here: https://github.com/zeebe-io/zeebe. Any help would be greatly appreciated.

1

There are 1 best solutions below

1
On

This is a bug in converting the configuration. I created an bug issue for that https://github.com/zeebe-io/zeebe/issues/4552

Be aware that the exporters are now a map instead of a list, which means you should configure it like this:

exporters:
  kafka:
    className: io.zeebe.exporters.kafka.KafkaExporter
    args:

Instead of:

exporters:
  - id: kafka
    className: io.zeebe.exporters.kafka.KafkaExporter
    args: