How can I use Kafka-connect to consume and deserialise the Avro records that Divolte collector writes to Kafka? I want to turn the serialised AVRO records back into JSON events and sink them into a kinesis data stream.
I am using the AWS labs kinesis streams sink plugin, and I have the configuration set as below. I originally attempted to use Divolte in naked mode and imported the registryless Avro converter (commented out in the worker config below) but I couldn't get this to work as it threw back an error "not an Avro file".
Then I swapped Divolte over to confluent mode in order to use the confluent registry/converters 'io.confluent.connect.avro.AvroConverter', also seen below (uncommented). This throws the error:
kafka-connect| Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
This is unexpected as I'm using divolte in confluent mode, and the deserialiser is the confluent deserialiser. Below is the worker config:
bootstrap.servers=broker:29092
key.converter=io.confluent.connect.avro.AvroConverter
value.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081/subjects/Kafka-key/versions/1
value.converter.schema.registry.url=http://localhost:8081/subjects/Kafka-key/versions/1
#key.converter=me.frmr.kafka.connect.RegistrylessAvroConverter
#value.converter=me.frmr.kafka.connect.RegistrylessAvroConverter
#key.converter.schema.path=/opt/kafka/config/DefaultEventRecord.avsc
#value.converter.schema.path=/opt/kafka/config/DefaultEventRecord.avsc
key.converter.schemas.enable=false
value.converter.schemas.enable=false
#internal.value.converter=org.apache.kafka.connect.storage.StringConverter
#internal.key.converter=org.apache.kafka.connect.storage.StringConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=true
internal.value.converter.schemas.enable=true
offset.storage.file.filename=offset.log
schemas.enable=false
plugin.path=/opt/kafka/plugins/
My divolte collector configuration is seems correct where I have added the confluent_id of 1, corresponding to the default divolte schema ID when the schema is posted to the schema registry:
divolte {
global {
server {
host = 0.0.0.0
host = ${?DIVOLTE_HOST}
port = 8290
port = ${?DIVOLTE_PORT}
use_x_forwarded_for = false
use_x_forwarded_for = ${?DIVOLTE_USE_XFORWARDED_FOR}
serve_static_resources = true
serve_static_resources = ${?DIVOLTE_SERVICE_STATIC_RESOURCES}
debug_requests = false
}
mapper {
buffer_size = 1048576
threads = 1
duplicate_memory_size = 1000000
user_agent_parser {
type = non_updating
cache_size = 1000
}
}
kafka {
enabled = false
enabled = ${?DIVOLTE_KAFKA_ENABLED}
threads = 2
buffer_size = 1048576
producer = {
bootstrap.servers = ["localhost:9092"]
bootstrap.servers = ${?DIVOLTE_KAFKA_BROKER_LIST}
client.id = divolte.collector
client.id = ${?DIVOLTE_KAFKA_CLIENT_ID}
acks = 1
retries = 0
compression.type = lz4
max.in.flight.requests.per.connection = 1
sasl.jaas.config = ""
sasl.jaas.config = ${?KAFKA_SASL_JAAS_CONFIG}
security.protocol = PLAINTEXT
security.protocol = ${?KAFKA_SECURITY_PROTOCOL}
sasl.mechanism = GSSAPI
sasl.kerberos.service.name = kafka
}
}
}
sources {
browser1 = {
type = browser
event_suffix = event
party_cookie = _dvp
party_cookie = ${?DIVOLTE_PARTY_COOKIE}
party_timeout = 730 days
party_timeout = ${?DIVOLTE_PARTY_TIMEOUT}
session_cookie = _dvs
session_cookie = ${?DIVOLTE_SESSION_COOKIE}
session_timeout = 30 minutes
session_timeout = ${?DIVOLTE_SESSION_TIMEOUT}
cookie_domain = ''
cookie_domain = ${?DIVOLTE_COOKIE_DOMAIN}
javascript {
name = divolte.js
name = ${?DIVOLTE_JAVASCRIPT_NAME}
logging = false
logging = ${?DIVOLTE_JAVASCRIPT_LOGGING}
debug = false
debug = ${?DIVOLTE_JAVASCRIPT_DEBUG}
auto_page_view_event = true
auto_page_view_event = ${?DIVOLTE_JAVASCRIPT_AUTO_PAGE_VIEW_EVENT}
}
}
}
sinks {
kafka1 {
type = kafka
mode = confluent
topic = clickstream
topic = ${?DIVOLTE_KAFKA_TOPIC}
}
}
mappings {
a_mapping = {
//schema_file = /opt/divolte/conf/DefaultEventRecord.avsc
//mapping_script_file = schema-mapping.groovy
confluent_id = 1
sources = [browser1]
sinks = [kafka1]
}
}
}
As can be seen from the Divolte configuration, I have not declared a schema or schema mapping so the defaults are used. So in this case I registered the DefaultEventRecord.avsc schema file with the schema registry as:
{"schema":"{\"namespace\":\"io.divolte.record\",\"type\":\"record\",\"name\":\"DefaultEventRecord\",\"fields\":[{\"name\":\"detectedDuplicate\",\"type\":\"boolean\"},{\"name\":\"detectedCorruption\",\"type\":\"boolean\"},{\"name\":\"firstInSession\",\"type\":\"boolean\"},{\"name\":\"timestamp\",\"type\":\"long\"},{\"name\":\"clientTimestamp\",\"type\":\"long\"},{\"name\":\"remoteHost\",\"type\":\"string\"},{\"name\":\"referer\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"location\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"viewportPixelWidth\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"viewportPixelHeight\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"screenPixelWidth\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"screenPixelHeight\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"partyId\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"sessionId\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"pageViewId\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"eventType\",\"type\":\"string\",\"default\":\"unknown\"},{\"name\":\"userAgentString\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"userAgentName\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"userAgentFamily\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"userAgentVendor\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"userAgentType\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"userAgentVersion\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"userAgentDeviceCategory\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"userAgentOsFamily\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"userAgentOsVersion\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"userAgentOsVendor\",\"type\":[\"null\",\"string\"],\"default\":null}]}"}
and posted it to the schema registry with the command
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" --data @divolte-schema-v1.avsc http://localhost:8081/subjects/Kafka-key/versions
and included the URL in the worker configuration.
If anyone could point me in the right direction I would be grateful. As Divolte collector is designed to write to Kafka and even has a built-in confluent mode, I expect that consuming from Kafka with Kafka connect and doing the deserialisation with Kafka-connect's built-in converters should be very straight forward, but clearly I have something conflicting. Is my schema file correct? Am I using the correct converters to undo Divolte's AVRO serialisation? Have I configured Divolte correctly to work in confluent mode and is the confluent ID even correct? The documentation is not very specific on this (I set the ID = 1, which is the returned ID when I post my first schema to the schema registry after it starts up).
I think you may have misconfigured Kafka Connect.
internal.*.converter
properties should not be touched. They are deprecated, in fact, and should be removed...schema.registry.url
values should not refer to any version, onlyhttp://localhost:8081
schemas.enable
on its own does nothing. This is only a value for*.converter=JSONConverter
, as invalue.converter=...JSONConverter
and only then can you setvalue.converter.schemas.enable=*
. Avro always has a schema, and cannot be "disabled"You've also posted the schema to the wrong path.
Kafka-key
is for key schemas of the topic namedKafka
.Your Divolte config says
topic = clickstream
, for example, notKafka
... Plus, you need to POST two different schemas if both the key and value are actually different payloads, such assubjects/clickstream-key
andsubjects/clickstream-value
. When you do the HTTP request, you will get a response of that schema ID, which you need to put in your config asconfluent_id
.In any case, I recommend skipping Kafka Connect for now. You can debug your events written to Kafka simply using
kafka-avro-console-consumer
. And if that works, then you should useAvroConverter
from Connect. More specifically, the "registryless" one doesn't use schema IDs, so settingconfluent_id = 1
wouldn't make sense.Plus, key and value converters do not need to be the same. You can add
--print-keys=true
to the console consumer to deserialize both as Avro. Otherwise, I suspect your error is from the keys not being Avro...