Is it possible to achieve this in ballerina
- To create a new kafka topic in ballerina
- To list available topics in Ballerina
- Subscribe to a created topic in ballerina
Is it possible to achieve this in ballerina
EDIT: Update the sample codes to comply with the latest ballerina version (from V1.2.0 upwards).
You can
If you send data using a
Kafka producer
, it will publish data to that particular topic, and if the topic is not available, it will create the topic, and publish. (To support this, you have to setauto.create.topics.enable=true
in the broker properties).Consider you want to publish to the topic
test
from a producer. You can create a producer endpoint calledkafka:Producer
and send data to a particular topic usingsend()
function.
kafka:Producer sampleProducer = new ({
bootstrapServers: "localhost:9092",
acks: "all",
valueSerializerType: kafka:SER_STRING
});
string topic = "test";
string msg = "Your Message";
sampleProducer->send(messageToPublish, topic);`
Now if there is a topic called
test
is available for the Kafka broker hosted atlocalhost:9092
, it will publish the message to the topic, or it will create the topic, if it doesn't exist.
You can use
subscribe()
function ofKafka:Consumer
to subscribe to a topic.
listener kafka:Consumer sampleConsumer = new ({
bootstrapServers: "localhost:9090",
groupId: "test-consumers",
valueDeserializerType: kafka:DES_STRING
});
string topic = "test";
string[] topics = [topic];
sampleConsumer->subscribe(topics);
Please note that the
subscribe()
takesstring[]
as the input parameter, hence you should pass astring[]
to it.There are other functions such as
subscribeToPattern()
,subscribeWithPartitionRebalance()
which are also can be used to subscribe a consumer to a topic, you can find more about them in the API Documentation.
But to list the available topics, you need to get the list of topics from the Kafka broker itself. But you can get a list of topics, which is currently subscribed by a particular consumer, using ballerina.
string[] subscribedTopics;
var result = sampleConsumer->getSubscription();
if (result is error) {
// Your logic for handling the error
} else {
subscribedTopics = result;
}
Make sure to handle the error here, as the
getSubscription()
can return either astring[]
or anerror
. Ballerina type guard can do the trick for you.
You can subscribe to a topic using the following code:
This Github repo might be quite useful for you. It contains various examples for both consumers and producers.
Regarding your questions for creating and listing topics, if you don't need to perform these actions from Ballerina, you can do so from your command line: