Dynamic creation of Kafka Connectors

601 Views Asked by At

I have deployed a Kafka cluster and a Kafka Connect cluster in kubernetes, using Strimzi and AKS. And I wanted to start reading from RSS resources to feed my Kafka cluster, so I created a connector instance of "org.kaliy.kafka.connect.rss.RssSourceConnector" which reads from a specific RSS feed, given an url, and writes to a specific topic. But my whole intention with this is to eventually have a Kafka Connect cluster able to manage a lot of external requests of new RSSs to read from; and here is where all my doubts come in:

  • Shoud I create an instance of Kaliy RSS connector for each RSS feed? Or would it be better to implement my own connector, so I create only one instance of it and each time I want to read a new RSS feed I would create a new Task in the connector?
  • Who should be resposible of assuring the Kafka Connect Cluster state is the desired one? I mean that if a Connector(in the case of 1 RSS feed : 1 Connector instance) stopped working, who should try to start it again? An external client via the Kafka Connect REST API? Kubernetes itself?

Right now, I think my best option is to rely on Kafka Connect REST API making the external client responsible of managing the state of the set of connectors, but I don't know if these was designed to recieve a lot of requests as it would be the case. Maybe these could be scaled by provisioning several listeners in the Kafka Connect REST API configuration but I do not know. Thanks a lot!

2

There are 2 best solutions below

0
On

One of the main benefits in using Kafka Connect is to leverage a configuration-driven approach, so you will lose this by implementing your own Connector. In my opinion, the best strategy is to have one Connector instance for each RSS feed. Reducing the number of instances could make sense when having a single data source system, to avoid overloading it.

Using Strimzi Operator, Kafka Connect cluster would be monitored and it will try to restore the desired cluster state when needed. This does not include the single Connector instances and their tasks, but you may leverage the K8s API for monitoring the Connector custom resource (CR) status, instead of the REST API.

Example:

$ kubetctl get kafkaconnector amq-sink -o yaml
apiVersion: kafka.strimzi.io/v1alpha1
kind: KafkaConnector
# ...
status:
  conditions:
  - lastTransitionTime: "2020-12-07T10:30:28.349Z"
    status: "True"
    type: Ready
  connectorStatus:
    connector:
      state: RUNNING
      worker_id: 10.116.0.66:8083
    name: amq-sink
    tasks:
    - id: 0
      state: RUNNING
      worker_id: 10.116.0.66:8083
    type: sink
  observedGeneration: 1
0
On

It could be late, but it could help anyone will pass by the question, It is more relevant to have a look at Kafka-connect CR (Custom Resources) as a part of Confluent For Kubernetes (CFK), it introduces a clear cut declarative way to manage and monitor Connector with health checks and auto healing.

https://www.confluent.io/blog/declarative-connectors-with-confluent-for-kubernetes/