I'm trying to setup a simple pipeline using Apache Beam to read data from Kafka. As it is a test, I run the pipeline on a DirectRunner. My consumer group needs to be prefixed with X
for authorization reasons. But Apache Beam uses an internal autogenerated prefix already. Is there a way to overwrite this autogenerated prefix?
def run_pipeline():
options = PipelineOptions(streaming=True)
with beam.Pipeline(options=options) as p:
(
p
| "Read from Kafka"
>> ReadFromKafka(
consumer_config={
"bootstrap.servers": "host:port, \
host:port, \
host:port, \
host:port",
"auto.offset.reset": "earliest",
"group.id": "X_group",
"security.protocol": "SASL_PLAINTEXT",
"sasl.mechanism": "PLAIN",
"sasl.jaas.config": f"org.apache.kafka.common.security.plain.PlainLoginModule required username={username} password={password};",
},
topics=["topic"],
max_num_records=10,
)
| "Print to console" >> beam.Map(print)
)
The error message as my group should start with X_group
and not end with X_group
as Apache Beam does:
trace: "org.apache.kafka.common.errors.GroupAuthorizationException: Not authorized to access group: Reader-0_offset_consumer_1220072650_X_group\n"