I have a apache Beam multi language pipeline.
Main Pipeline is in Python,I am calling an external Transformation written in Java.
The first step of the pipeline is calling java external transformation,Which is a composite transformation in which I am reading the avro file and decrypting the file.
When I read avro file from python and pass those records to Java pipeline everything is working.But that is not I want, as I need to call some proprietary library which accepts only Genericrecords.I can generate genericObject record only using Java.
But when I read avro file from java external transformation I am getting below error.
Error Message:
Error message from worker: org.apache.beam.sdk.util.UserCodeException: java.lang.IllegalArgumentException: No filesystem found for scheme gs
org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39)
org.apache.beam.sdk.io.Read$BoundedSourceAsSDFWrapperFn$DoFnInvoker.invokeSplitRestriction(Unknown Source)
org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForSplitRestriction(FnApiDoFnRunner.java:887)
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:325)
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:252)
org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1788)
org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForPairWithRestriction(FnApiDoFnRunner.java:824)
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:325)
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:252)
org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1788)
org.apache.beam.fn.harness.FnApiDoFnRunner.access$3000(FnApiDoFnRunner.java:142)
org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.output(FnApiDoFnRunner.java:2506)
org.apache.beam.sdk.io.Read$OutputSingleSource.processElement(Read.java:1052)
org.apache.beam.sdk.io.Read$OutputSingleSource$DoFnInvoker.invokeProcessElement(Unknown Source)
org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:799)
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:325)
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:252)
org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:213)
org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.multiplexElements(BeamFnDataInboundObserver.java:158)
org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:537)
org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:150)
org.apache.beam.fn.harness.control.BeamFnControlClient$InboundObserver.lambda$onNext$0(BeamFnControlClient.java:115)
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
org.apache.beam.sdk.util.UnboundedScheduledExecutorService$ScheduledFutureTask.run(UnboundedScheduledExecutorService.java:163)
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.IllegalArgumentException: No filesystem found for scheme gs
org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:515)
org.apache.beam.sdk.io.FileSystems.match(FileSystems.java:121)
org.apache.beam.sdk.io.FileSystems.match(FileSystems.java:142)
org.apache.beam.sdk.io.FileSystems.match(FileSystems.java:154)
org.apache.beam.sdk.io.FileBasedSource.getEstimatedSizeBytes(FileBasedSource.java:236)
org.apache.beam.sdk.io.Read$BoundedSourceAsSDFWrapperFn.splitRestriction(Read.java:288)
Attaching the Java code:
public PCollection expand(PBegin input) {
return input
.apply("Read File" , AvroIO.readGenericRecords(avroSchema).from("gs://bucket_name/*.avro"))
.apply(
"Decrypt",
ParDo.of(new DecryptDoFn())).setCoder(AvroCoder.of(avroSchema))
.apply("Generic To Json", ParDo.of(new GenericToJsonDoFn()));
}
How can I make this external transformation recognize GCS location.Is there any way I can set some options on Java code to so that this transformation recognizes GCS File scheme?
When I look in documetation,FileSystem is set from PipelineOptions.How can I access this info and set this so that Java external transformation recognizes this.
Things I tried:
1.using Maven shade plugin
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>org.apache.beam.sdk.expansion.service.ExpansionService</mainClass>
</transformer>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
</transformers>
But it dod not have any effect, as I could not see META-INF updating with enrty META-INF/services/org.apache.beam.sdk.io.FileSystemRegistrar.
Current entry in META-INF/services is org.apache.beam.sdk.expansion.ExternalTransformRegistrar