How do I deserialise Divolte AVRO records from Kafka using Kafka connect with schema registry?

112 Views Asked by At

How can I use Kafka-connect to consume and deserialise the Avro records that Divolte collector writes to Kafka? I want to turn the serialised AVRO records back into JSON events and sink them into a kinesis data stream.

I am using the AWS labs kinesis streams sink plugin, and I have the configuration set as below. I originally attempted to use Divolte in naked mode and imported the registryless Avro converter (commented out in the worker config below) but I couldn't get this to work as it threw back an error "not an Avro file".

Then I swapped Divolte over to confluent mode in order to use the confluent registry/converters 'io.confluent.connect.avro.AvroConverter', also seen below (uncommented). This throws the error:

kafka-connect| Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!

This is unexpected as I'm using divolte in confluent mode, and the deserialiser is the confluent deserialiser. Below is the worker config:

bootstrap.servers=broker:29092

key.converter=io.confluent.connect.avro.AvroConverter
value.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081/subjects/Kafka-key/versions/1
value.converter.schema.registry.url=http://localhost:8081/subjects/Kafka-key/versions/1

#key.converter=me.frmr.kafka.connect.RegistrylessAvroConverter
#value.converter=me.frmr.kafka.connect.RegistrylessAvroConverter
#key.converter.schema.path=/opt/kafka/config/DefaultEventRecord.avsc
#value.converter.schema.path=/opt/kafka/config/DefaultEventRecord.avsc

key.converter.schemas.enable=false
value.converter.schemas.enable=false

#internal.value.converter=org.apache.kafka.connect.storage.StringConverter
#internal.key.converter=org.apache.kafka.connect.storage.StringConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=true
internal.value.converter.schemas.enable=true

offset.storage.file.filename=offset.log
schemas.enable=false
plugin.path=/opt/kafka/plugins/

My divolte collector configuration is seems correct where I have added the confluent_id of 1, corresponding to the default divolte schema ID when the schema is posted to the schema registry:

divolte {
  global {
    server {
      host = 0.0.0.0
      host = ${?DIVOLTE_HOST}
      port = 8290
      port = ${?DIVOLTE_PORT}
      use_x_forwarded_for = false
      use_x_forwarded_for = ${?DIVOLTE_USE_XFORWARDED_FOR}
      serve_static_resources = true
      serve_static_resources = ${?DIVOLTE_SERVICE_STATIC_RESOURCES}
      debug_requests = false
    }

    mapper {
      buffer_size = 1048576
      threads = 1
      duplicate_memory_size = 1000000
      user_agent_parser {
        type = non_updating
        cache_size = 1000
      }
    }

    kafka {
      enabled = false
      enabled = ${?DIVOLTE_KAFKA_ENABLED}
      threads = 2
      buffer_size = 1048576
      producer = {
        bootstrap.servers = ["localhost:9092"]
        bootstrap.servers = ${?DIVOLTE_KAFKA_BROKER_LIST}
        client.id = divolte.collector
        client.id = ${?DIVOLTE_KAFKA_CLIENT_ID}
        acks = 1
        retries = 0
        compression.type = lz4
        max.in.flight.requests.per.connection = 1

        sasl.jaas.config = ""
        sasl.jaas.config = ${?KAFKA_SASL_JAAS_CONFIG}

        security.protocol = PLAINTEXT
        security.protocol = ${?KAFKA_SECURITY_PROTOCOL}
        sasl.mechanism = GSSAPI
        sasl.kerberos.service.name = kafka
      }
    }
  }

  sources {
    browser1 = {
      type = browser
      event_suffix = event
      party_cookie = _dvp
      party_cookie = ${?DIVOLTE_PARTY_COOKIE}
      party_timeout = 730 days
      party_timeout = ${?DIVOLTE_PARTY_TIMEOUT}
      session_cookie = _dvs
      session_cookie = ${?DIVOLTE_SESSION_COOKIE}
      session_timeout = 30 minutes
      session_timeout = ${?DIVOLTE_SESSION_TIMEOUT}
      cookie_domain = ''
      cookie_domain = ${?DIVOLTE_COOKIE_DOMAIN}

      javascript {
        name = divolte.js
        name = ${?DIVOLTE_JAVASCRIPT_NAME}
        logging = false
        logging = ${?DIVOLTE_JAVASCRIPT_LOGGING}
        debug = false
        debug = ${?DIVOLTE_JAVASCRIPT_DEBUG}
        auto_page_view_event = true
        auto_page_view_event = ${?DIVOLTE_JAVASCRIPT_AUTO_PAGE_VIEW_EVENT}
      }
    }
  }

  sinks {
    kafka1 {
      type = kafka
      mode = confluent
      topic = clickstream
      topic = ${?DIVOLTE_KAFKA_TOPIC}
    }
  }

  mappings {
    a_mapping = {
    //schema_file = /opt/divolte/conf/DefaultEventRecord.avsc
    //mapping_script_file = schema-mapping.groovy
    confluent_id = 1
    sources = [browser1]
    sinks = [kafka1]
    }
  }
}

As can be seen from the Divolte configuration, I have not declared a schema or schema mapping so the defaults are used. So in this case I registered the DefaultEventRecord.avsc schema file with the schema registry as:

{"schema":"{\"namespace\":\"io.divolte.record\",\"type\":\"record\",\"name\":\"DefaultEventRecord\",\"fields\":[{\"name\":\"detectedDuplicate\",\"type\":\"boolean\"},{\"name\":\"detectedCorruption\",\"type\":\"boolean\"},{\"name\":\"firstInSession\",\"type\":\"boolean\"},{\"name\":\"timestamp\",\"type\":\"long\"},{\"name\":\"clientTimestamp\",\"type\":\"long\"},{\"name\":\"remoteHost\",\"type\":\"string\"},{\"name\":\"referer\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"location\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"viewportPixelWidth\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"viewportPixelHeight\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"screenPixelWidth\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"screenPixelHeight\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"partyId\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"sessionId\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"pageViewId\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"eventType\",\"type\":\"string\",\"default\":\"unknown\"},{\"name\":\"userAgentString\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"userAgentName\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"userAgentFamily\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"userAgentVendor\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"userAgentType\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"userAgentVersion\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"userAgentDeviceCategory\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"userAgentOsFamily\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"userAgentOsVersion\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"userAgentOsVendor\",\"type\":[\"null\",\"string\"],\"default\":null}]}"} 

and posted it to the schema registry with the command

curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" --data @divolte-schema-v1.avsc http://localhost:8081/subjects/Kafka-key/versions

and included the URL in the worker configuration.

If anyone could point me in the right direction I would be grateful. As Divolte collector is designed to write to Kafka and even has a built-in confluent mode, I expect that consuming from Kafka with Kafka connect and doing the deserialisation with Kafka-connect's built-in converters should be very straight forward, but clearly I have something conflicting. Is my schema file correct? Am I using the correct converters to undo Divolte's AVRO serialisation? Have I configured Divolte correctly to work in confluent mode and is the confluent ID even correct? The documentation is not very specific on this (I set the ID = 1, which is the returned ID when I post my first schema to the schema registry after it starts up).

1

There are 1 best solutions below

0
On

I think you may have misconfigured Kafka Connect. internal.*.converter properties should not be touched. They are deprecated, in fact, and should be removed...

schema.registry.url values should not refer to any version, only http://localhost:8081

schemas.enable on its own does nothing. This is only a value for *.converter=JSONConverter, as in value.converter=...JSONConverter and only then can you set value.converter.schemas.enable=* . Avro always has a schema, and cannot be "disabled"


You've also posted the schema to the wrong path. Kafka-key is for key schemas of the topic named Kafka.

Your Divolte config says topic = clickstream, for example, not Kafka... Plus, you need to POST two different schemas if both the key and value are actually different payloads, such as subjects/clickstream-key and subjects/clickstream-value. When you do the HTTP request, you will get a response of that schema ID, which you need to put in your config as confluent_id.


In any case, I recommend skipping Kafka Connect for now. You can debug your events written to Kafka simply using kafka-avro-console-consumer. And if that works, then you should use AvroConverter from Connect. More specifically, the "registryless" one doesn't use schema IDs, so setting confluent_id = 1 wouldn't make sense.

Plus, key and value converters do not need to be the same. You can add --print-keys=true to the console consumer to deserialize both as Avro. Otherwise, I suspect your error is from the keys not being Avro...