I am using kafka-connector to sink messages to snowflake.
Docker image: cp-kafka-connect-base:6.2.0
I have two consumer pods running in distributed mode. Please find the connect-config below
connector.class: "com.snowflake.kafka.connector.SnowflakeSinkConnector"
tasks.max: "2"
topics: "test-topic"
snowflake.topic2table.map: "test-topic:table1"
buffer.count.records: "500000"
buffer.flush.time: "240"
buffer.size.bytes: "100000000"
snowflake.url.name: "<url>"
snowflake.warehouse.name: "name"
snowflake.user.name: "username"
snowflake.private.key: "key"
snowflake.private.key.passphrase: "pass"
snowflake.database.name: "db-name"
snowflake.schema.name: "schema-name"
key.converter: "com.snowflake.kafka.connector.records.SnowflakeJsonConverter"
value.converter: "com.snowflake.kafka.connector.records.SnowflakeJsonConverter"
envs:
CONNECT_GROUP_ID: "testgroup"
CONNECT_CONFIG_STORAGE_TOPIC: "snowflakesync-config"
CONNECT_STATUS_STORAGE_TOPIC: "snowflakesync-status"
CONNECT_OFFSET_STORAGE_TOPIC: "snowflakesync-offset"
CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_REST_ADVERTISED_HOST_NAME: "localhost"
CONNECT_REST_PORT: "8083"
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "3"
CONNECT_OFFSET_FLUSH_INTERVAL_MS: "5000"
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "3"
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "3"
CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECTOR_NAME: "test-conn"
I am running two pods with the above config. Two pods are properly attached to one partition each and starts consuming.
Question :: Whenever I deploy / restart the pods,the offets are getting committed [ CURRENT-OFFSET is getting updated] only ONCE , post that the sink connector keeps consuming the messages from topic, but the current-offset is NOT at all updated. ( offsets are not getting committed )
kafka-consumer-groups --bootstrap-server <server> --describe --group connect-test-conn
This is the command used to check the Current-offset is getting updated or not. Since only once the current_offset is updated, it always shows a lag and the lag keeps increasing.
But , I could see in logs ( put records ) & from snowflake the events are getting persisted.
Would like to know why the offsets are not getting committed continuously.
Example case: ( output of consumer-group command )
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
events-sync 0 6408022 25524319 19116297 connector-consumer-events-sync-0-b9142c5f-3bb7-47b1-bd44-a169a7984952 /xx.xx.xx.xx connector-consumer-events-sync-0
events-sync 1 25521059 25521202 143 connector-consumer-events-sync-1-107f2aa8-969c-4d7e-87f8-fdb2be2480b3 /xx.xx.xx.xx connector-consumer-events-sync-1