Write Spark dataframe into partitioned Google BigQuery table using per write credentials

1.6k Views Asked by At

I am running Spark in process so the cluster is also completely configured in code.

from pyspark.sql import SparkSession

required_jars = [
    "./spark_dependencies/gcs-connector-hadoop2-latest.jar",
    "./spark_dependencies/spark-avro_2.12-3.3.0.jar",
    "./spark_dependencies/spark-bigquery-with-dependencies_2.12-0.28.0.jar"
]
spark = (
    SparkSession
    .builder
    .master("local[*]")
    .appName("My App")
    .config("spark.jars", ",".join(required_jars))
    .config("spark.hadoop.fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
    .getOrCreate()
)

I am trying to write a dataframe to Google BigQuery using Apache Spark SQL connector for Google BigQuery. My destination table is partitioned so I cannot use the direct write method but rather have to use the indirect method which will write data first to Google Cloud Storage using Google Cloud Storage Connector for Spark and Hadoop.

from google.oauth2 import service_account

credentials = service_account.Credentials.from_service_account_info({"MY": "SERVICE ACCOUNT INFO"})
df = spark.createDataFrame(
    [
        ('a', 1),
        ('b', 2),
        ('c', 3)
    ],
    ["id", "value"]
)

(
    df
        .write
        .format("bigquery")
        .mode("append")
        .option("parentProject", "my-project")
        .option("credentials", credentials)
        .option("createDisposition", "CREATE_NEVER")
        .option("temporaryGcsBucket", "my-bucket")
        .option("intermediateFormat", "avro")
        .option("useAvroLogicalTypes", "true")
        .save("my-partitioned-table")
)

I am running a service for multiple clients each of which has their own service account so I cannot configure a single service account for the whole project and have to define it per read/write as shown in the code snippet. The problem is that the BigQuery connector does not configure the GCS Hadoop connector as I assumed and this stated in the connector documentation:

Important: The connector does not configure the GCS connector, in order to avoid conflict with another GCS connector, if exists. In order to use the write capabilities of the connector, please configure the GCS connector on your cluster as explained here.

This forces me to configure the service account for the GCS connector for the complete Spark cluster as defined here (the exact configuration params are not documented but supported). Adding this to the Spark cluster config when creating the Spark session:

    .config("spark.hadoop.fs.gs.auth.service.account.private.key.id", "my-private-key-id")
    .config("spark.hadoop.fs.gs.auth.service.account.private.key", "my-private_key")
    .config("spark.hadoop.fs.gs.auth.service.account.email", "my-client_email")

This resolve the problem for one client whose service account I am using, or I have to change the Spark cluster configuration before each write operation which is risky because we might run into race conditions if we ran things in parallel.

What would be the recommended approach to handle this case? Did I miss a configuration parameter which can help me with this. Unfortunately, the documentation of the BigQuery connector did not help much here as the credentials delegation is only mentioned in the note quoted above. Is there a workaround until the direct write method is fully functional and supports partitioned tables?

For now, I figured that I can use Pandas as an intermediate step, having my table already created with the partitioning configured as this is also not supported by the Pandas BigQuery:

df.toPandas().to_gbq(
    destination_table="my-partitioned-table",
    project_id="my-project",
    reauth=True,
    if_exists='append',
    credentials=credentials
)

Edit: For completeness, configuring only the credentials for the write operation will cause raise an error with the following stack trace:

py4j.protocol.Py4JJavaError: An error occurred while calling o94.save.
: java.io.UncheckedIOException: java.io.IOException: Error getting access token from metadata server at: http://a.b.c.d/computeMetadata/v1/instance/service-accounts/default/token
at com.google.cloud.spark.bigquery.SparkBigQueryUtil.createGcsPath(SparkBigQueryUtil.java:113)
at com.google.cloud.spark.bigquery.write.BigQueryWriteHelper.<init>(BigQueryWriteHelper.java:80)
at com.google.cloud.spark.bigquery.write.BigQueryDeprecatedIndirectInsertableRelation.insert(BigQueryDeprecatedIndirectInsertableRelation.java:41)
at com.google.cloud.spark.bigquery.write.CreatableRelationProviderHelper.createRelation(CreatableRelationProviderHelper.java:51)
at com.google.cloud.spark.bigquery.BigQueryRelationProvider.createRelation(BigQueryRelationProvider.scala:107)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:47)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:560)
at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:116)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:860)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:390)
at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:363)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.io.IOException: Error getting access token from metadata server at: http://a.b.c.d/computeMetadata/v1/instance/service-accounts/default/token
at com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.util.CredentialFactory.getCredentialFromMetadataServiceAccount(CredentialFactory.java:253)
at com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.util.CredentialFactory.getCredential(CredentialFactory.java:392)
at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.getCredential(GoogleHadoopFileSystemBase.java:1344)
at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.createGcsFs(GoogleHadoopFileSystemBase.java:1501)
at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.configure(GoogleHadoopFileSystemBase.java:1483)
at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.initialize(GoogleHadoopFileSystemBase.java:470)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3469)
at org.apache.hadoop.fs.FileSystem.access$300(FileSystem.java:174)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3574)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3521)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:540)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)
at com.google.cloud.spark.bigquery.SparkBigQueryUtil.getUniqueGcsPath(SparkBigQueryUtil.java:127)
at com.google.cloud.spark.bigquery.SparkBigQueryUtil.createGcsPath(SparkBigQueryUtil.java:108)
... 45 more
Caused by: java.net.SocketTimeoutException: connect timed out
at java.base/java.net.PlainSocketImpl.socketConnect(Native Method)
at java.base/java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:412)
at java.base/java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:255)
at java.base/java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:237)
at java.base/java.net.Socket.connect(Socket.java:609)
at java.base/sun.net.NetworkClient.doConnect(NetworkClient.java:177)
at java.base/sun.net.www.http.HttpClient.openServer(HttpClient.java:507)
at java.base/sun.net.www.http.HttpClient.openServer(HttpClient.java:602)
at java.base/sun.net.www.http.HttpClient.<init>(HttpClient.java:275)
at java.base/sun.net.www.http.HttpClient.New(HttpClient.java:374)
at java.base/sun.net.www.http.HttpClient.New(HttpClient.java:395)
at java.base/sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient(HttpURLConnection.java:1253)
at java.base/sun.net.www.protocol.http.HttpURLConnection.plainConnect0(HttpURLConnection.java:1187)
at java.base/sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:1081)
at java.base/sun.net.www.protocol.http.HttpURLConnection.connect(HttpURLConnection.java:1015)
at com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.http.javanet.NetHttpRequest.execute(NetHttpRequest.java:151)
at com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.http.javanet.NetHttpRequest.execute(NetHttpRequest.java:84)
at com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.http.HttpRequest.execute(HttpRequest.java:1012)
at com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.util.CredentialFactory$ComputeCredentialWithRetry.executeRefreshToken(CredentialFactory.java:195)
at com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.auth.oauth2.Credential.refreshToken(Credential.java:470)
at com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.util.CredentialFactory.getCredentialFromMetadataServiceAccount(CredentialFactory.java:250)
... 58 more
0

There are 0 best solutions below