Debezium / kafka connect neither creating topics nor pushing data to created topic

1.5k Views Asked by At

I have zookeeper, kafka, and kafka connect running with a debezium plugin. I cannot see any errors. I have whitelisted a db and a couple of tables. Upon inserts or updates to the tables no topic is created; and no data dir for the topics in /tmp/kafka-logs. I just tried creating a topic for one of the tables (a kind of dummy table 't' with one column) but after inserting a row into t there is no data in the topic's log...a screenshot of the log follows

There is only one topic (the one I created, to see if it would be populated if I created it myself; I didn't expect it would be and it wasn't):

vagrant@coton:~$ ./kafka/kafka_2.13-2.8.0/bin/kafka-topics.sh --bootstrap-server 1.2.3.4:9092 --list
vagrant.bet.t
vagrant@coton:~$ 

some kafka connect log output

The connector status is running:

vagrant@coton:~$ curl -X GET http://localhost:8083/connectors/pg-sport-connector/ | jq
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100   940  100   940    0     0  50165      0 --:--:-- --:--:-- --:--:-- 52222
{
  "name": "pg-sport-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "database.user": "postgres",
    "database.dbname": "sport",
    "transforms": "unwrap",
    "database.server.name": "vagrant",
    "database.port": "5432",
    "plugin.name": "pgoutput",
    "table.whitelist": "bet.event,bet.t",
    "internal.key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": "false",
    "decimal.handling.mode": "string",
    "database.hostname": "localhost",
    "database.password": "",
    "value.converter.schemas.enable": "false",
    "internal.value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "name": "pg-sport-connector",
    "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "database.whitelist": "sport",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter"
  },
  "tasks": [
    {
      "connector": "pg-sport-connector",
      "task": 0
    }
  ],
  "type": "source"
}

vagrant@coton:~$ curl -X GET http://localhost:8083/connectors/pg-sport-connector/status | jq
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100   172  100   172    0     0  10876      0 --:--:-- --:--:-- --:--:-- 11466
{
  "name": "pg-sport-connector",
  "connector": {
    "state": "RUNNING",
    "worker_id": "127.0.1.1:8083"
  },
  "tasks": [
    {
      "id": 0,
      "state": "RUNNING",
      "worker_id": "127.0.1.1:8083"
    }
  ],
  "type": "source"
}

Running Java 8. Thanks.

2

There are 2 best solutions below

0
On

Checkout parameter "table.include.list". In my case problem was that I specified the value for this parameter as "test" (my table name), but it should be "public.test".

9
On

Try adding this in your config

"database.history.kafka.bootstrap.servers": "kafka:9092",  
"database.history.kafka.topic": "schema-changes.your_topic_name"

Then check your topic name with this

./kafka/kafka_2.13-2.8.0/bin/kafka-topics.sh --zookeeper localhost:2181 --list

After this, you can check for any changes in the tables

./kafka/kafka_2.13-2.8.0/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic your_topic_name