I have the code below for connecting to kafka using the python beam sdk. I know that the ReadFromKafka
transform is run in a java sdk harness (docker container) but I have not been able to figure out how to make ssl.truststore.location
and ssl.keystore.location
accesible inside the sdk harness' docker environment. The job_endpoint
argument is pointing to java -jar beam-runners-flink-1.10-job-server-2.27.0.jar --flink-master localhost:8081
pipeline_args.extend([
'--job_name=paul_test',
'--runner=PortableRunner',
'--sdk_location=container',
'--job_endpoint=localhost:8099',
'--streaming',
"--environment_type=DOCKER",
f"--sdk_harness_container_image_overrides=.*java.*,{my_beam_sdk_docker_image}:{my_beam_docker_tag}",
])
with beam.Pipeline(options=PipelineOptions(pipeline_args)) as pipeline:
kafka = pipeline | ReadFromKafka(
consumer_config={
"bootstrap.servers": "bootstrap-server:17032",
"security.protocol": "SSL",
"ssl.truststore.location": "/opt/keys/client.truststore.jks", # how do I make this available to the Java SDK harness
"ssl.truststore.password": "password",
"ssl.keystore.type": "PKCS12",
"ssl.keystore.location": "/opt/keys/client.keystore.p12", # how do I make this available to the Java SDK harness
"ssl.keystore.password": "password",
"group.id": "group",
"basic.auth.credentials.source": "USER_INFO",
"schema.registry.basic.auth.user.info": "user:password"
},
topics=["topic"],
max_num_records=2,
# expansion_service="localhost:56938"
)
kafka | beam.Map(lambda x: print(x))
I tried specifying the image override option as --sdk_harness_container_image_overrides='.*java.*,beam_java_sdk:latest'
- where beam_java_sdk:latest
is a docker image I based on apache/beam_java11_sdk:2.27.0
and that pulls the credetials in its entrypoint.sh. But Beam does not appear to use it, I see
INFO org.apache.beam.runners.fnexecution.environment.DockerEnvironmentFactory - Still waiting for startup of environment apache/beam_java11_sdk:2.27.0 for worker id 1-1
in the logs. Which is soon inevitebly followed by
Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.common.KafkaException: org.apache.kafka.common.KafkaException: Failed to load SSL keystore /opt/keys/client.keystore.p12 of type PKCS12
In conclusion, my question is this, In Apache Beam, is it possible to make files available inside java sdk harness docker container from the python beam sdk? If so, how might it be done?
Many thanks.
Actually, you can override default expansion service, when you use Kafka Connector. To do that you need to define expansion service like that: