Apache Spark GCS Connector Issue

70 Views Asked by At

I'm trying to write the parquet file to the GCP bucket using the spark job. Currently, I'm getting an "InvocationTargetException" error.

More details on the error,

Exception in thread "main" java.lang.RuntimeException: java.lang.reflect.InvocationTargetException at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:137) at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3467) at org.apache.hadoop.fs.FileSystem.access$300(FileSystem.java:174) at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3574) at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3521) at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:540) at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365) at org.apache.spark.sql.execution.datasources.DataSource.planForWritingFileFormat(DataSource.scala:461) at org.apache.spark.sql.execution.datasources.DataSource.planForWriting(DataSource.scala:558) at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:390) at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:363) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239) at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:793) at com.example.jay.LoadToBucket$.writeGcpBucketAsParquet(LoadToBucket.scala:9) at com.example.jay.ExampleWrite.main(ExampleWrite.scala) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) at java.base/java.lang.reflect.Method.invoke(Unknown Source) at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52) at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:958) at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180) at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203) at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90) at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1046) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1055) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.lang.reflect.InvocationTargetException at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(Unknown Source) at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown Source) at java.base/java.lang.reflect.Constructor.newInstance(Unknown Source) at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:135) ... 28 more Caused by: java.lang.VerifyError: Bad type on operand stack Exception Details: Location: com/google/api/ClientProto.registerAllExtensions(Lcom/google/protobuf/ExtensionRegistryLite;)V @4: invokevirtual Reason: Type 'com/google/protobuf/GeneratedMessage$GeneratedExtension' (current frame, stack[1]) is not assignable to 'com/google/protobuf/ExtensionLite' Current Frame: bci: @4 flags: { } locals: { 'com/google/protobuf/ExtensionRegistryLite' } stack: { 'com/google/protobuf/ExtensionRegistryLite', 'com/google/protobuf/GeneratedMessage$GeneratedExtension' }
Bytecode: 0000000: 2ab2 0002 b600 032a b200 04b6 0003 2ab2 0000010: 0005 b600 03b1

at com.google.storage.v2.StorageProto.(StorageProto.java:872) at com.google.cloud.hadoop.gcsio.GoogleCloudStorageOptions.(GoogleCloudStorageOptions.java:55) at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemConfiguration.(GoogleHadoopFileSystemConfiguration.java:358) at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.(GoogleHadoopFileSystemBase.java:250) at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem.(GoogleHadoopFileSystem.java:58) ... 33 more

Spark program:

  def writeGcpBucketAsParquet(df: DataFrame, bucketSuffix: String): Unit = {
    var path =gcpBucketPath + bucketSuffix
    df.repartition(1).write.mode(SaveMode.Overwrite).parquet(path)

  }

pom.xml file

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.example.exe</groupId>
    <artifactId>SparkExample</artifactId>

    <version>1.0</version>
    <packaging>jar</packaging>

    <organization>
        <name>example</name>
        <url>https://example.com/</url>
    </organization>

    <properties>
        <scala.version>2.12.12</scala.version>
        <scala.binary.version>2.12</scala.binary.version>
        <spark.version>3.3.2</spark.version>
        <org.apache.logging.log4j.version>2.19.0</org.apache.logging.log4j.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_${scala.binary.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_${scala.binary.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql-kafka-0-10_${scala.binary.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-reflect</artifactId>
            <version>${scala.version}</version>
        </dependency>

        <!--<dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-kubernetes_2.3</artifactId>
            <version>${spark.version}</version>
        </dependency>-->

        <!-- Read application.conf file -->
        <dependency>
            <groupId>com.typesafe</groupId>
            <artifactId>config</artifactId>
            <version>1.4.2</version>
        </dependency>

        <!-- Postgres DB -->
        <dependency>
            <groupId>org.postgresql</groupId>
            <artifactId>postgresql</artifactId>
            <version>42.5.4</version>
        </dependency>
        <!-- GCS Connector Hadoop3 for write data to bucket -->
        <!--<dependency>
            <groupId>com.google.cloud.bigdataoss</groupId>
            <artifactId>gcs-connector</artifactId>
            <version>hadoop2-2.2.17</version>
        </dependency>-->
        <dependency>
            <groupId>com.google.cloud.bigdataoss</groupId>
            <artifactId>gcs-connector</artifactId>
            <version>hadoop3-2.2.17</version>
        </dependency>
        <dependency>
            <groupId>com.google.cloud.bigdataoss</groupId>
            <artifactId>gcsio</artifactId>
            <version>2.2.17</version>
        </dependency>
        <dependency>
            <groupId>com.google.cloud.bigdataoss</groupId>
            <artifactId>util</artifactId>
            <version>2.2.17</version>
        </dependency>
        <dependency>
            <groupId>com.google.cloud.bigdataoss</groupId>
            <artifactId>util-hadoop</artifactId>
            <version>hadoop3-2.2.17</version>
        </dependency>
        <dependency>
            <groupId>com.google.cloud</groupId>
            <artifactId>google-cloud-storage</artifactId>
            <version>2.27.1</version>
        </dependency>
        <dependency>
            <groupId>com.google.cloud.bigdataoss</groupId>
            <artifactId>bigquery-connector</artifactId>
            <version>hadoop3-1.2.0</version>
        </dependency>
        <dependency>
            <groupId>com.google.guava</groupId>
            <artifactId>guava</artifactId>
            <version>32.1.2-jre</version>
        </dependency>
        <dependency>
            <groupId>com.google.oauth-client</groupId>
            <artifactId>google-oauth-client</artifactId>
            <version>1.34.1</version>
        </dependency>
        <dependency>
            <groupId>com.google.protobuf</groupId>
            <artifactId>protobuf-java</artifactId>
            <version>3.24.4</version>
        </dependency>
        <dependency>
            <groupId>com.google.auto.value</groupId>
            <artifactId>auto-value-annotations</artifactId>
            <version>1.10.4</version>
        </dependency>
        <dependency>
            <groupId>com.google.flogger</groupId>
            <artifactId>flogger</artifactId>
            <version>0.7.4</version>
        </dependency>
        <dependency>
            <groupId>com.google.flogger</groupId>
            <artifactId>google-extensions</artifactId>
            <version>0.7.4</version>
        </dependency>
        <dependency>
            <groupId>com.google.flogger</groupId>
            <artifactId>flogger-system-backend</artifactId>
            <version>0.7.4</version>
        </dependency>
        <dependency>
            <groupId>com.google.code.gson</groupId>
            <artifactId>gson</artifactId>
            <version>2.10.1</version>
        </dependency>
        <dependency>
            <groupId>io.grpc</groupId>
            <artifactId>grpc-bom</artifactId>
            <version>1.58.0</version>
            <type>pom</type>
        </dependency>
        <dependency>
            <groupId>com.google.cloud</groupId>
            <artifactId>libraries-bom</artifactId>
            <version>26.24.0</version>
            <type>pom</type>
        </dependency>

    </dependencies>


    <build>
        <sourceDirectory>src/main/scala</sourceDirectory>
        <testSourceDirectory>src/main/resources</testSourceDirectory>
        <plugins>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>4.8.1</version>
                <executions>
                    <execution>
                        <id>scala-compile-first</id>
                        <phase>process-resources</phase>
                        <goals>
                            <goal>add-source</goal>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                    <execution>
                        <id>scala-test-compile</id>
                        <phase>process-test-resources</phase>
                        <goals>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
                <configuration>
                    <scalaVersion>${scala.version}</scalaVersion>
                </configuration>
            </plugin>


    <build>
        <sourceDirectory>src/main/scala</sourceDirectory>
        <testSourceDirectory>src/main/resources</testSourceDirectory>
        <plugins>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>4.8.1</version>
                <executions>
                    <execution>
                        <id>scala-compile-first</id>
                        <phase>process-resources</phase>
                        <goals>
                            <goal>add-source</goal>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                    <execution>
                        <id>scala-test-compile</id>
                        <phase>process-test-resources</phase>
                        <goals>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
                <configuration>
                    <scalaVersion>${scala.version}</scalaVersion>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.4.1</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <shadedArtifactAttached>true</shadedArtifactAttached>
                            <transformers>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>com.example.exe.SparkExample</mainClass>
                                </transformer>
                            </transformers>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                        </configuration>
                    </execution>
                </executions>
            </plugin>

        </plugins>
    </build>
</project>
0

There are 0 best solutions below