Kafka sink connector not committing offsets but consuming messages

599 Views Asked by At

I am using kafka-connector to sink messages to snowflake. Docker image: cp-kafka-connect-base:6.2.0

I have two consumer pods running in distributed mode. Please find the connect-config below

  connector.class: "com.snowflake.kafka.connector.SnowflakeSinkConnector"
  tasks.max: "2"
  topics: "test-topic"
  snowflake.topic2table.map: "test-topic:table1"
  buffer.count.records: "500000"
  buffer.flush.time: "240"
  buffer.size.bytes: "100000000"
  snowflake.url.name: "<url>"
  snowflake.warehouse.name: "name"
  snowflake.user.name: "username"
  snowflake.private.key: "key"
  snowflake.private.key.passphrase: "pass"
  snowflake.database.name: "db-name"
  snowflake.schema.name: "schema-name"
  key.converter: "com.snowflake.kafka.connector.records.SnowflakeJsonConverter"
  value.converter: "com.snowflake.kafka.connector.records.SnowflakeJsonConverter"

envs:

  CONNECT_GROUP_ID: "testgroup" 
  CONNECT_CONFIG_STORAGE_TOPIC: "snowflakesync-config"
  CONNECT_STATUS_STORAGE_TOPIC: "snowflakesync-status"
  CONNECT_OFFSET_STORAGE_TOPIC: "snowflakesync-offset"
  CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
  CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
  CONNECT_REST_ADVERTISED_HOST_NAME: "localhost"
  CONNECT_REST_PORT: "8083"
  CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "3"
  CONNECT_OFFSET_FLUSH_INTERVAL_MS: "5000"
  CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "3"
  CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "3"
  CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
  CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
  CONNECTOR_NAME: "test-conn"

I am running two pods with the above config. Two pods are properly attached to one partition each and starts consuming.

Question :: Whenever I deploy / restart the pods,the offets are getting committed [ CURRENT-OFFSET is getting updated] only ONCE , post that the sink connector keeps consuming the messages from topic, but the current-offset is NOT at all updated. ( offsets are not getting committed )

kafka-consumer-groups --bootstrap-server <server>  --describe --group connect-test-conn

This is the command used to check the Current-offset is getting updated or not. Since only once the current_offset is updated, it always shows a lag and the lag keeps increasing.

But , I could see in logs ( put records ) & from snowflake the events are getting persisted.

Would like to know why the offsets are not getting committed continuously.

Example case: ( output of consumer-group command )

TOPIC                 PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                                                HOST            CLIENT-ID
events-sync            0          6408022         25524319        19116297        connector-consumer-events-sync-0-b9142c5f-3bb7-47b1-bd44-a169a7984952 /xx.xx.xx.xx   connector-consumer-events-sync-0
events-sync            1          25521059        25521202        143                 connector-consumer-events-sync-1-107f2aa8-969c-4d7e-87f8-fdb2be2480b3 /xx.xx.xx.xx    connector-consumer-events-sync-1
0

There are 0 best solutions below