I am trying to create a streams in ksqlDB from Kafka topics serialized with Avro. I am using Apicurio to handle the schemas. The topics are created with Debezium, which is monitoring changes in a MySQL database.
In my tests, I have managed to create the topics but when I try to create a stream using
CREATE STREAM customers (id int, first_name varchar, last_name VARCHAR, email VARCHAR) WITH (KAFKA_TOPIC='dbserver1.inventory.customers',VALUE_FORMAT='AVRO');
then I get the following error: Could not get latest schema for subject dbserver1.inventory.customers-value Caused by: Failed to discover artifact type from content.; error code: 0
I've created my stack with two docker-compose files:
version: '2'
services:
zookeeper:
image: quay.io/debezium/zookeeper:${DEBEZIUM_VERSION}
ports:
- 2181:2181
- 2888:2888
- 3888:3888
networks:
- etl-streaming-network
kafka:
image: quay.io/debezium/kafka:${DEBEZIUM_VERSION}
ports:
- 9092:9092
links:
- zookeeper
environment:
- ZOOKEEPER_CONNECT=zookeeper:2181
networks:
- etl-streaming-network
mysql:
image: quay.io/debezium/example-mysql:${DEBEZIUM_VERSION}
ports:
- 3306:3306
environment:
- MYSQL_ROOT_PASSWORD=debezium
- MYSQL_USER=mysqluser
- MYSQL_PASSWORD=mysqlpw
networks:
- etl-streaming-network
apicurio:
image: apicurio/apicurio-registry-mem:2.2.5.Final
ports:
- 8080:8080
networks:
- etl-streaming-network
connect:
image: quay.io/debezium/connect:${DEBEZIUM_VERSION}
ports:
- 8083:8083
links:
- kafka
- mysql
- apicurio
environment:
- BOOTSTRAP_SERVERS=kafka:9092
- GROUP_ID=1
- CONFIG_STORAGE_TOPIC=my_connect_configs
- OFFSET_STORAGE_TOPIC=my_connect_offsets
- STATUS_STORAGE_TOPIC=my_connect_statuses
- ENABLE_APICURIO_CONVERTERS=true
networks:
- etl-streaming-network
kafdrop:
image: obsidiandynamics/kafdrop:latest
container_name: kafdrop
ports:
- 19000:9000
environment:
KAFKA_BROKERCONNECT: kafka:9092
links:
- kafka
networks:
- etl-streaming-network
networks:
etl-streaming-network:
driver: bridge
and
version: '2'
services:
ksqldb-server:
image: confluentinc/ksqldb-server:latest
hostname: ksqldb-server
container_name: ksqldb-server
depends_on:
- kafka
- apicurio
ports:
- "8088:8088"
environment:
KSQL_LISTENERS: http://0.0.0.0:8088
KSQL_BOOTSTRAP_SERVERS: kafka:9092
KSQL_HOST_NAME: ksql-server
KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true"
KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true"
KSQL_KSQL_SCHEMA_REGISTRY_URL: http://apicurio:8080/apis/ccompat/v6
networks:
- etl-streaming-network
restart: unless-stopped
ksqldb-cli:
image: confluentinc/ksqldb-cli:latest
container_name: ksqldb-cli
depends_on:
- ksqldb-server
entrypoint: /bin/sh
tty: true
restart: unless-stopped
networks:
- etl-streaming-network
networks:
etl-streaming-network:
driver: bridge
The configuration of the Debezium connector is as follows:
{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz",
"database.server.id": "184054",
"topic.prefix": "dbserver1",
"database.include.list": "inventory",
"schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
"schema.history.internal.kafka.topic": "schema-changes.inventory",
"key.converter": "io.apicurio.registry.utils.converter.AvroConverter",
"key.converter.apicurio.registry.url": "http://apicurio:8080/apis/registry/v2",
"key.converter.apicurio.registry.auto-register": "true",
"key.converter.apicurio.registry.find-latest": "true",
"value.converter": "io.apicurio.registry.utils.converter.AvroConverter",
"value.converter.apicurio.registry.url": "http://apicurio:8080/apis/registry/v2",
"value.converter.apicurio.registry.auto-register": "true",
"value.converter.apicurio.registry.find-latest": "true"
}
}
I have very little experience with Kafka, so any help would be appreciated.