Apache Beam Consumer Prefix

395 Views Asked by At

I'm trying to setup a simple pipeline using Apache Beam to read data from Kafka. As it is a test, I run the pipeline on a DirectRunner. My consumer group needs to be prefixed with X for authorization reasons. But Apache Beam uses an internal autogenerated prefix already. Is there a way to overwrite this autogenerated prefix?

def run_pipeline():
    options = PipelineOptions(streaming=True)
    with beam.Pipeline(options=options) as p:
        (
            p
            | "Read from Kafka"
            >> ReadFromKafka(
                consumer_config={
                    "bootstrap.servers": "host:port, \
                                          host:port, \
                                          host:port, \
                                          host:port",
                    "auto.offset.reset": "earliest",
                    "group.id": "X_group",
                    "security.protocol": "SASL_PLAINTEXT",
                    "sasl.mechanism": "PLAIN",
                    "sasl.jaas.config": f"org.apache.kafka.common.security.plain.PlainLoginModule required username={username} password={password};",
                },
                topics=["topic"],
                max_num_records=10,
            )
            | "Print to console" >> beam.Map(print)
        )

The error message as my group should start with X_group and not end with X_group as Apache Beam does:

trace: "org.apache.kafka.common.errors.GroupAuthorizationException: Not authorized to access group: Reader-0_offset_consumer_1220072650_X_group\n"
0

There are 0 best solutions below