My JDBC Source Connector config is:
{
"name": "vcs.transactions.ke.source_connector",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"quote.sql.identifiers": "never",
"timestamp.column.name": "updated_at",
"incrementing.column.name": "id_transaction",
"tasks.max": "1",
"internal.key.converter.schemas.enable": "false",
"mode": "timestamp+incrementing",
"topic.prefix": "vcs.transactions_approved.ke.topic",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"validate.non.null": "false",
"query": "SELECT * FROM transaction",
"connection.attempts": "10",
"batch.max.rows": "100",
"timestamp.delay.interval.ms": "0",
"db.timezone": "Africa/Nairobi",
"connection.backoff.ms": "100",
"poll.interval.ms": "100",
"value.converter.schemas.enable": "false",
"connection.url": "jdbc:mysql://<host>:3306/<db_name>?user=<user>&password=<password>",
"numeric.mapping": "best_fit",
"numeric.precision.mapping": "true"
}
}
Using:
"db.timezone": "Africa/Nairobi"
The timezone Africa/Nairobi is UTC+3.
In theory, the connector should fetch data using the timezone but that's not what I can observe.
Example:
1 -- now()
update transaction t set updated_at = now() where t.id_transaction = 1;
2 -- now() + INTERVAL 1 HOUR
update transaction t set updated_at = now() + INTERVAL 1 HOUR where t.id_transaction = 2;
3 -- now() + INTERVAL 2 HOUR
update transaction t set updated_at = now() + INTERVAL 2 HOUR where t.id_transaction = 3;
4 -- now() + INTERVAL 3 HOUR
update transaction t set updated_at = now() + INTERVAL 3 HOUR where t.id_transaction = 4;
5 -- now() + INTERVAL 4 HOUR
update transaction t set updated_at = now() + INTERVAL 4 HOUR where t.id_transaction = 5;
When creating the connector I would expect to fetch the row (4) that match the timezone, but what I get is (1). And the payload instead of having the field updated_at in unix_timestamp I get the value with a time diff 3 hours ago. The same behavior is reflected on the topic connect_offset where timestamp is also 3 hours ago.
Assuming now() is 1707477449 => 2024-02-09 11:17:29
Message on topic connect_offset:
{
"timestamp_nanos": 0,
"incrementing": 1,
"timestamp": 1707466649
}
where timestamp is 1707466649 which is 2024-02-09 08:17:29 => 3 hours ago from the database record. I was expecting 3 hours later!!!.
It seems the timezone is being applied in reverse mode.
Note: the database is MySQL.
Does anyone have the same setup? Is it bad config? Should the timezone be defined on the connection url instead?
Issue posted also on https://github.com/confluentinc/kafka-connect-jdbc/issues/1397