It is being developed to attach smt (single message transforms) to the debezium mysql source connector. I implemented debezium smt by referring to the documentation, put the smt and mysql source connector jar files into the docker image of kafka connect, and then set the plug in path. When calling connector registration, the following error occurs.
{
"error_code": 400,
"message": "Connector configuration is invalid and contains the following 2 error(s):\nInvalid value com.github.gunbos.kafka.connect.smt.CustomTransformer for configuration transforms.router.type: Class com.github.gunbos.kafka.connect.smt.CustomTransformer could not be found.\nInvalid value null for configuration transforms.router.type: Not a Transformation\nYou can also find the above list of errors at the endpoint `/connector-plugins/{connectorType}/config/validate`"
}
Excluding the transformation option, the mysql source connector runs fine. can you tell me what is the problem?? I've been struggling with this for days.
here is my docker, and sql connector option
FROM confluentinc/cp-kafka-connect-base:7.2.6
COPY debezium-connector-mysql/ /opt/custom-connector/debezium-connector-mysql
COPY transformer/ /opt/custom-transformer/custom-event-router
version: '2'
services:
zookeeper:
container_name: zookeeper
image: confluentinc/cp-zookeeper:7.2.6
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- "2181:2181"
kafka:
container_name: kafka
image: confluentinc/cp-kafka:7.2.6
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
kafka_ui:
image: provectuslabs/kafka-ui:latest
depends_on:
- kafka
ports:
- "8080:8080"
environment:
KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092
mysql:
image: "mysql:8.0.31"
ports:
- "3306:3306"
env_file:
- .env
restart: always
kafka_connect:
container_name: custom-connect
image: "my-custom-connect"
build:
dockerfile: ./Dockerfile
depends_on:
- kafka
ports:
- "8083:8083"
environment:
CONNECT_BOOTSTRAP_SERVERS: "kafka:29092"
CONNECT_REST_PORT: "8083"
CONNECT_GROUP_ID: "outbox"
CONNECT_CONFIG_STORAGE_TOPIC: "outbox-config"
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_STORAGE_TOPIC: "outbox-offset"
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_TOPIC: "outbox-status"
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE: "false"
CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: "false"
CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_REST_ADVERTISED_HOST_NAME: "localhost"
CONNECT_PLUGIN_PATH: "/usr/share/java, /opt/custom-connector/, /opt/custom-transformer/"
{
"name": "outbox-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "cain",
"database.password": "cain",
"topic.prefix": "test",
"database.server.id": "123456",
"database.include.list": "kafka_connect",
"table.include.list": "kafka_connect.outbox",
"topic.creation.default.replication.factor" : 1,
"topic.creation.default.partitions" : 1,
"schema.history.internal.kafka.topic": "schemaHistory.outbox",
"schema.history.internal.kafka.bootstrap.servers": "kafka:29092",
"transforms": "router",
"transforms.router.type": "com.github.gunbos.kafka.connect.smt.CustomTransformer"
}
}
I would really appreciate it if you let me know. this is my github https://github.com/gunb0s/kafka-test