Problem
I am new to Kafka and Debezium. I have successfully create connector for mysql. Now I have been using sample config Mysql Debezium connector to capture event some tables
My registered MySQL debezium connector looks as following: Kafka Connect: apiVersion: eventstreams.ibm.com/v1beta2 kind: KafkaConnect metadata: annotations: eventstreams.ibm.com/use-connector-resources: "true" name: debezium-mysql spec: config: status.storage.topic: debezium-mysql-status status.storage.replication.factor: 3 offset.storage.topic: debezium-mysql-offsets value.converter: org.apache.kafka.connect.json.JsonConverter max.request.size: 5242880 group.id: debezium-mysql value.converter.schemas.enable: false config.storage.replication.factor: 3 config.storage.topic: debezium-mysql-configs key.converter: org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable: false offset.storage.replication.factor: 3 config.storage.cleanup.policy: compact listeners: 0.0.0.0 bootstrapServers: "es-kafka-bootstrap.tools.svc:9095" metricsConfig: type: jmxPrometheusExporter valueFrom: configMapKeyRef: key: connect-metrics-config.yaml name: metrics-config resources: limits: cpu: 500m memory: 1Gi requests: cpu: 200m memory: 512Mi externalConfiguration: volumes: - name: kafka-tls-authen secret: secretName: kafka-admin-tls authentication: passwordSecret: password: password secretName: kafka-admin type: scram-sha-512 username: kafka-admin template: buildConfig: pullSecret: ibm-entitlement-key pod: imagePullSecrets: - name: gitlab-cp4i-access-token metadata: annotations: cloudpakId: c8b82d189e7545f0892db9ef2731b90d productVersion: 11.1.6 productID: 2a79e49111f44ec3acd89608e56138f5 cloudpakName: IBM Cloud Pak for Integration productChargedContainers: debezium-mysql-connect productCloudpakRatio: "2:1" productName: IBM Event Streams for Non Production eventstreams.production.type: CloudPakForIntegrationNonProduction productMetric: VIRTUAL_PROCESSOR_CORE image: >- registry.pvcb.vn/pvcombank/cntt/cdsvk/cp4i/event-streams/connector/debezium-connectors@sha256:9fe88f72495be5f670059a57f5d32738b759ea0d0d3266ac880e1a3c32a66ffb replicas: 1
Kakfa Connector:
apiVersion: eventstreams.ibm.com/v1beta2 kind: KafkaConnector metadata: name: mysql-source-dbx labels: eventstreams.ibm.com/cluster: debezium-mysql spec: class: io.debezium.connector.mysql.MySqlConnector config: schema.history.internal.consumer.sasl.mechanism: SCRAM-SHA-512 value.converter: org.apache.kafka.connect.json.JsonConverter table.include.list: >- dbxdb.billpaytransfers,dbxdb.interbankfundtransfers,dbxdb.fab_recurringbill,dbxdb.customer,dbxdb.intrabanktransfers,dbxdb.ownaccounttransfers schema.history.internal.producer.sasl.mechanism: SCRAM-SHA-512 database.hostname: dbx-uat-sql.pvcb.vn value.converter.schemas.enable: false database.password: zM8aNyoczssRhizWFUGh schema.history.internal.consumer.sasl.jaas.config: >- org.apache.kafka.common.security.scram.ScramLoginModule required username="kafka-admin" password="kbM1eQ9UyEyNykhLf1dWE09GVj53fn2w"; topic.prefix: dr-hci-ocp-uat key.converter: org.apache.kafka.connect.json.JsonConverter schema.history.internal.producer.sasl.jaas.config: >- org.apache.kafka.common.security.scram.ScramLoginModule required username="kafka-admin" password="kbM1eQ9UyEyNykhLf1dWE09GVj53fn2w"; database.server.id: "1082420516" database.port: 3306 database.serverTimezone: Asia/Ho_Chi_Minh schema.history.internal.producer.security.protocol: SASL_PLAINTEXT database.include.list: "dbxdb" schema.history.internal.consumer.security.protocol: SASL_PLAINTEXT include.schema.changes: "true" schema.history.internal.kafka.topic: schemahistory.mysql.source.dbx topic.creation.default.replication.factor: 3 topic.creation.default.partitions: 5 topic.creation.default.cleanup.policy: compact topic.creation.default.compression.type: lz4 database.user: infinityuatmysql schema.history.internal.kafka.bootstrap.servers: "es-kafka-bootstrap.tools.svc:9095" decimal.handling.mode: double schema.history.internal.kafka.recovery.poll.interval.ms: 1000 schema.history.internal.kafka.recovery.attempts: 500 snapshot.mode: schema_only_recovery tasksMax: 1
Enviroment
Mysql is RDS I deployed in My Connector in Openshift Cluster and Kafka CLuster is avariable. Steps deploy CDC for mysql 3.1. Setup Mysql
· Enable binlog format is ROW
· Set retention for binlog is 7 day Link: https://debezium.io/documentation/reference/stable/connectors/mysql.html#setting-up-mysql 3.2. Deploy a Kafka Connect 3.3 Register a Mysql Connector
Log and Error Message Can't compare binlog filenames with different base names I suspect there might be an error "MySQL purges binlog files" Link: https://debezium.io/documentation/reference/stable/connectors/mysql.html#mysql-purges-binlog-files-used-by-debezium 5. Data of MySQL Debezium Connector
{ "before": null, "after": { "transactionId": 2, "featureActionId": "BILL_PAY_CREATE", "transactionType": "Topup", "transactionCurrency": null, "companyId": "7001036206_12267032", "roleId": "a6861dfb-0f34-4c08-8262-d605d4f6dd65", "payeeId": null, "billerId": "0333394358", "frequencyTypeId": null, "requestId": 6329215609472, "fromAccountNumber": "1080010066497", "toAccountNumber": "107000664893", "amount": 10000.0, "status": "Executed", "confirmationNumber": "FT23**47BWBJ", "description": null, "notes": "NAP TIEN DIEN THOAI TRA TRUOC 0333394358", "transactionts": null, "frequencystartdate": null, "frequencyenddate": null, "numberOfRecurrences": null, "scheduledDate": "2024-01-11T21:18:53Z", "deliverBy": null, "zipCode": null, "processingDate": null, "personId": null, "fromNickName": "Khach hang 12456032", "fromAccountType": null, "day1": null, "day2": null, "toAccountType": null, "payPersonName": null, "securityQuestion": null, "SecurityAnswer": null, "checkImageBack": null, "payeeName": null, "profileId": null, "cardNumber": null, "cardExpiry": null, "isScheduled": null, "createdby": "1897236820", "modifiedby": null, "createdts": "2024-01-11T14:19:09Z", "lastmodifiedts": "2024-01-11T07:19:09Z", "synctimestamp": "2024-01-11T07:19:09Z", "softdeleteflag": 0, "serviceCharge": "0.0", "transactionAmount": "10000.0", "convertedAmount": null, "paidBy": null, "swiftCode": null, "traceNumberPay": "946693", "referenceNumberPay": "2024011185910039", "partnerCode": "ESTIO001", "responseDesc": "N/A", "serviceCode": "100001", "providerCode": "", "customerCode": "0333394358", "responseCode": "00" }, "source": { "version": "2.3.0.Final", "connector": "mysql", "name": "dr-hci-ocp-uat", "ts_ms": 1705998639000, "snapshot": "true", "db": "dbxdb", "sequence": null, "table": "billpaytransfers", "server_id": 0, "gtid": null, "file": "mysql-bin-changelog.000375", "pos": 12163, "row": 0, "thread": null, "query": null }, "op": "c", "ts_ms": 1705998639170, "transaction": null }
I think mysql connector is error: "debezium unable to start if the binlog is purged". I have delele or empty message of topic "debezium-mysql-offsets". Finally, I delete my pod Kafka connect to restart, My Connector had ready. But I know this is just a temporary solution. After a period of time, my connector isn't ready and I have to delete messages in the 'debezium-mysql-offsets' and restart the Kafka Connect pod so that the MySQL connector can be ready.But this, in turn, poses an issue with data integrity assurance (at least one). Morever, I try to fix issue by set retetion with time more than:
CALL mysql.rds_set_configuration('binlog retention hours', 168);
I expect to understand the root cause of my issue and ensure that the MySQL connector is fixed to ensure at least one.