How to do some kafka commands in ballerina service api

243 Views Asked by At

Is it possible to achieve this in ballerina

  1. To create a new kafka topic in ballerina
  2. To list available topics in Ballerina
  3. Subscribe to a created topic in ballerina
2

There are 2 best solutions below

3
On

You can subscribe to a topic using the following code:

import ballerina/log;
import wso2/kafka;
import ballerina/internal;

// Kafka consumer endpoint
endpoint kafka:SimpleConsumer consumer {
    bootstrapServers: "localhost:9092, localhost:9093",
    // Consumer group ID
    groupId: "test-group",
    // Listen from topic 'test'
    topics: ["test"],
    // Poll every 1 second
    pollingInterval:1000
};

// Kafka service that listens from the topic 'product-price'
// 'inventoryControlService' subscribed to new product price updates from
// the product admin and updates the Database.
service<kafka:Consumer> kafkaService bind consumer {
    // Triggered whenever a message added to the subscribed topic
    onMessage(kafka:ConsumerAction consumerAction, kafka:ConsumerRecord[] records) {
        // Dispatched set of Kafka records to service, We process each one by one.
        foreach entry in records {
            byte[] serializedMsg = entry.value;
            // Convert the serialized message to string message
            string msg = internal:byteArrayToString(serializedMsg, "UTF-8");
            log:printInfo("New message received from the product admin");
            // log the retrieved Kafka record
            log:printInfo("Topic: " + entry.topic + "; Received Message: " + msg);
            // Mock logic
            // Update the database with the new price for the specified product
            log:printInfo("Database updated with the new price of the product");
        }
    }
}

This Github repo might be quite useful for you. It contains various examples for both consumers and producers.

Regarding your questions for creating and listing topics, if you don't need to perform these actions from Ballerina, you can do so from your command line:

bin/kafka-topics.sh --zookeeper localhost:2181 --topic test --from-beginning
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor <number_of_replicas> --partitions <number_of_partitions> --topic test
0
On

EDIT: Update the sample codes to comply with the latest ballerina version (from V1.2.0 upwards).

You can

  1. Create a new topic

If you send data using a Kafka producer, it will publish data to that particular topic, and if the topic is not available, it will create the topic, and publish. (To support this, you have to set auto.create.topics.enable=true in the broker properties).

Consider you want to publish to the topic test from a producer. You can create a producer endpoint called kafka:Producer and send data to a particular topic using send() function.

kafka:Producer sampleProducer = new ({
  bootstrapServers: "localhost:9092",
  acks: "all",
  valueSerializerType: kafka:SER_STRING
});

string topic = "test";
string msg = "Your Message";
sampleProducer->send(messageToPublish, topic);`

Now if there is a topic called test is available for the Kafka broker hosted at localhost:9092, it will publish the message to the topic, or it will create the topic, if it doesn't exist.

  1. Subscribe to a new topic

You can use subscribe() function of Kafka:Consumer to subscribe to a topic.

listener kafka:Consumer sampleConsumer = new ({
  bootstrapServers: "localhost:9090",
  groupId: "test-consumers",
  valueDeserializerType: kafka:DES_STRING
});

string topic = "test";
string[] topics = [topic];
sampleConsumer->subscribe(topics);

Please note that the subscribe() takes string[] as the input parameter, hence you should pass a string[] to it.

There are other functions such as subscribeToPattern(), subscribeWithPartitionRebalance() which are also can be used to subscribe a consumer to a topic, you can find more about them in the API Documentation.

But to list the available topics, you need to get the list of topics from the Kafka broker itself. But you can get a list of topics, which is currently subscribed by a particular consumer, using ballerina.

string[] subscribedTopics;
var result = sampleConsumer->getSubscription();
if (result is error) {
  // Your logic for handling the error
} else {
    subscribedTopics = result;
}

Make sure to handle the error here, as the getSubscription() can return either a string[] or an error. Ballerina type guard can do the trick for you.