How to connect kafka IO from apache beam to a cluster in confluent cloud

1k Views Asked by At

I´ve made a simple pipeline in Python to read from kafka, the thing is that the kafka cluster is on confluent cloud and I am having some trouble conecting to it.

Im getting the following log on the dataflow job:

Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:820)
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:631)
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:612)
    at org.apache.beam.sdk.io.kafka.KafkaIO$Read$GenerateKafkaSourceDescriptor.processElement(KafkaIO.java:1495)
Caused by: java.lang.IllegalArgumentException: Could not find a 'KafkaClient' entry in the JAAS configuration. System property 'java.security.auth.login.config' is not set

So I think Im missing something while passing the config since it mentions something related to it, Im really new to all of this and I know nothing about java so I dont know how to proceed even reading the JAAS documentation.

The code of the pipeline is the following:

import apache_beam as beam
from apache_beam.io.kafka import ReadFromKafka
from apache_beam.options.pipeline_options import PipelineOptions
import os
import json
import logging

os.environ['GOOGLE_APPLICATION_CREDENTIALS']='credentialsOld.json'

with open('cluster.configuration.json') as cluster:
    data=json.load(cluster)
    cluster.close()

def logger(element):
    logging.INFO('Something was found')  
      
def main():
    config={
        "bootstrap.servers":data["bootstrap.servers"],
        "security.protocol":data["security.protocol"],
        "sasl.mechanisms":data["sasl.mechanisms"],
        "sasl.username":data["sasl.username"],
        "sasl.password":data["sasl.password"],
        "session.timeout.ms":data["session.timeout.ms"],
        "auto.offset.reset":"earliest"
    }
    print('======================================================')
    beam_options = PipelineOptions(runner='DataflowRunner',project='project',experiments=['use_runner_v2'],streaming=True,save_main_session=True,job_name='kafka-stream-test')
    with beam.Pipeline(options=beam_options) as p:
        msgs = p | 'ReadKafka' >> ReadFromKafka(consumer_config=config,topics=['users'],expansion_service="localhost:8088")
        msgs | beam.FlatMap(logger)
        
if __name__ == '__main__':    
    main()

I read something about passing a property java.security.auth.login.config in the config dictionary but since that example is with java and I´am using python Im really lost at what I have to pass or even if that´s the property I have to pass etc.

btw Im getting the api key and secret from here and this is what I am passing to sasl.username and sasl.password

enter image description here

2

There are 2 best solutions below

1
On BEST ANSWER

I faced the same error the first time I tried the beam's expansion service. The key sasl.mechanisms that you are supplying is incorrect, try with sasl.mechanism also you do not need to supply the username and password since you are connection is authenticated by jasl basically the consumer_config like below worked for me:

config={
        "bootstrap.servers":data["bootstrap.servers"],
        "security.protocol":data["security.protocol"],
        "sasl.mechanism":data["sasl.mechanisms"],
        "session.timeout.ms":data["session.timeout.ms"],
        "group.id":"tto",
"sasl.jaas.config":f'org.apache.kafka.common.security.plain.PlainLoginModule required serviceName="Kafka" username=\"{data["sasl.username"]}\" password=\"{data["sasl.password"]}\";',
        "auto.offset.reset":"earliest"
    }
0
On

I got a partial answer to this question since I fixed this problem but got into another one:

config={
        "bootstrap.servers":data["bootstrap.servers"],
        "security.protocol":data["security.protocol"],
        "sasl.mechanisms":data["sasl.mechanisms"],
        "sasl.username":data["sasl.username"],
        "sasl.password":data["sasl.password"],
        "session.timeout.ms":data["session.timeout.ms"],
        "group.id":"tto",
        "sasl.jaas.config":f'org.apache.kafka.common.security.plain.PlainLoginModule required serviceName="Kafka" username=\"{data["sasl.username"]}\" password=\"{data["sasl.password"]}\";',
        "auto.offset.reset":"earliest"
    }

I needed to provide the sasl.jaas.config porpertie with the api key and secret of my cluster and also the service name, however, now Im facing a different error whe running the pipeline on dataflow:

Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata 

This error shows after 4-5 mins of trying to run the job on dataflow, actually I have no idea how to fix this but I think is related to my broker on confluent rejecting the connection, I think this could be related to the zone execution since the cluster is in a different zone than job region.

UPDATE:

I tested the code on linux/ubuntu and I dont know why but the expansión service gets downloaded automatically so you wont get unsoported signal error, still having some issues trying to autenticate to confluent kafka tho.