run debezum source conncector with custom single message transformer

133 Views Asked by At

It is being developed to attach smt (single message transforms) to the debezium mysql source connector. I implemented debezium smt by referring to the documentation, put the smt and mysql source connector jar files into the docker image of kafka connect, and then set the plug in path. When calling connector registration, the following error occurs.

{
  "error_code": 400,
  "message": "Connector configuration is invalid and contains the following 2 error(s):\nInvalid value com.github.gunbos.kafka.connect.smt.CustomTransformer for configuration transforms.router.type: Class com.github.gunbos.kafka.connect.smt.CustomTransformer could not be found.\nInvalid value null for configuration transforms.router.type: Not a Transformation\nYou can also find the above list of errors at the endpoint `/connector-plugins/{connectorType}/config/validate`"
}

Excluding the transformation option, the mysql source connector runs fine. can you tell me what is the problem?? I've been struggling with this for days.

here is my docker, and sql connector option

FROM confluentinc/cp-kafka-connect-base:7.2.6

COPY debezium-connector-mysql/ /opt/custom-connector/debezium-connector-mysql
COPY transformer/ /opt/custom-transformer/custom-event-router
version: '2'
services:
  zookeeper:
    container_name: zookeeper
    image: confluentinc/cp-zookeeper:7.2.6
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - "2181:2181"

  kafka:
    container_name: kafka
    image: confluentinc/cp-kafka:7.2.6
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

  kafka_ui:
    image: provectuslabs/kafka-ui:latest
    depends_on:
      - kafka
    ports:
      - "8080:8080"
    environment:
      KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181
      KAFKA_CLUSTERS_0_NAME: local
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092

  mysql:
    image: "mysql:8.0.31"
    ports:
      - "3306:3306"
    env_file:
      - .env
    restart: always

  kafka_connect:
    container_name: custom-connect
    image: "my-custom-connect"
    build:
      dockerfile: ./Dockerfile
    depends_on:
      - kafka
    ports:
      - "8083:8083"
    environment:
      CONNECT_BOOTSTRAP_SERVERS: "kafka:29092"
      CONNECT_REST_PORT: "8083"
      CONNECT_GROUP_ID: "outbox"
      CONNECT_CONFIG_STORAGE_TOPIC: "outbox-config"
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_STORAGE_TOPIC: "outbox-offset"
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_TOPIC: "outbox-status"
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE: "false"
      CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: "false"
      CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_REST_ADVERTISED_HOST_NAME: "localhost"
      CONNECT_PLUGIN_PATH: "/usr/share/java, /opt/custom-connector/, /opt/custom-transformer/"
{
  "name": "outbox-connector",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "tasks.max": "1",
    "database.hostname": "mysql",
    "database.port": "3306",
    "database.user": "cain",
    "database.password": "cain",
    "topic.prefix": "test",
    "database.server.id": "123456",
    "database.include.list": "kafka_connect",
    "table.include.list": "kafka_connect.outbox",
    "topic.creation.default.replication.factor" : 1,
    "topic.creation.default.partitions" : 1,
    "schema.history.internal.kafka.topic": "schemaHistory.outbox",
    "schema.history.internal.kafka.bootstrap.servers": "kafka:29092",
    "transforms": "router",
    "transforms.router.type": "com.github.gunbos.kafka.connect.smt.CustomTransformer"
  }
}

I would really appreciate it if you let me know. this is my github https://github.com/gunb0s/kafka-test

0

There are 0 best solutions below