org.apache.beam.sdk.util.UserCodeException: java.lang.IllegalArgumentException: No filesystem found for scheme gs

213 Views Asked by At

I have a apache Beam multi language pipeline.

Main Pipeline is in Python,I am calling an external Transformation written in Java.

The first step of the pipeline is calling java external transformation,Which is a composite transformation in which I am reading the avro file and decrypting the file.

When I read avro file from python and pass those records to Java pipeline everything is working.But that is not I want, as I need to call some proprietary library which accepts only Genericrecords.I can generate genericObject record only using Java.

But when I read avro file from java external transformation I am getting below error.

Error Message:

Error message from worker: org.apache.beam.sdk.util.UserCodeException: java.lang.IllegalArgumentException: No filesystem found for scheme gs
    org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39)
    org.apache.beam.sdk.io.Read$BoundedSourceAsSDFWrapperFn$DoFnInvoker.invokeSplitRestriction(Unknown Source)
    org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForSplitRestriction(FnApiDoFnRunner.java:887)
    org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:325)
    org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:252)
    org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1788)
    org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForPairWithRestriction(FnApiDoFnRunner.java:824)
    org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:325)
    org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:252)
    org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1788)
    org.apache.beam.fn.harness.FnApiDoFnRunner.access$3000(FnApiDoFnRunner.java:142)
    org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.output(FnApiDoFnRunner.java:2506)
    org.apache.beam.sdk.io.Read$OutputSingleSource.processElement(Read.java:1052)
    org.apache.beam.sdk.io.Read$OutputSingleSource$DoFnInvoker.invokeProcessElement(Unknown Source)
    org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:799)
    org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:325)
    org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:252)
    org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:213)
    org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.multiplexElements(BeamFnDataInboundObserver.java:158)
    org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:537)
    org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:150)
    org.apache.beam.fn.harness.control.BeamFnControlClient$InboundObserver.lambda$onNext$0(BeamFnControlClient.java:115)
    java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    org.apache.beam.sdk.util.UnboundedScheduledExecutorService$ScheduledFutureTask.run(UnboundedScheduledExecutorService.java:163)
    java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.IllegalArgumentException: No filesystem found for scheme gs
    org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:515)
    org.apache.beam.sdk.io.FileSystems.match(FileSystems.java:121)
    org.apache.beam.sdk.io.FileSystems.match(FileSystems.java:142)
    org.apache.beam.sdk.io.FileSystems.match(FileSystems.java:154)
    org.apache.beam.sdk.io.FileBasedSource.getEstimatedSizeBytes(FileBasedSource.java:236)
    org.apache.beam.sdk.io.Read$BoundedSourceAsSDFWrapperFn.splitRestriction(Read.java:288)

Attaching the Java code:

public PCollection expand(PBegin input) {

    return input
            .apply("Read File" , AvroIO.readGenericRecords(avroSchema).from("gs://bucket_name/*.avro"))
            .apply(
                    "Decrypt",
                    ParDo.of(new DecryptDoFn())).setCoder(AvroCoder.of(avroSchema))
            .apply("Generic To Json", ParDo.of(new GenericToJsonDoFn()));
}

How can I make this external transformation recognize GCS location.Is there any way I can set some options on Java code to so that this transformation recognizes GCS File scheme?

When I look in documetation,FileSystem is set from PipelineOptions.How can I access this info and set this so that Java external transformation recognizes this.

Things I tried:

1.using Maven shade plugin

<transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>org.apache.beam.sdk.expansion.service.ExpansionService</mainClass>
                                </transformer>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
                        </transformers>

But it dod not have any effect, as I could not see META-INF updating with enrty META-INF/services/org.apache.beam.sdk.io.FileSystemRegistrar.

Current entry in META-INF/services is org.apache.beam.sdk.expansion.ExternalTransformRegistrar

0

There are 0 best solutions below