How to rename GCS files in Spark running on Dataproc Serverless?

390 Views Asked by At

After writing a spark dataframe to a file, I am attempting to rename the file using code like below:

val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration)
val file = fs.globStatus(new Path(path + "/part*"))(0).getPath().getName()
fs.rename(new Path(path + "/" + file), new Path(path + "/" + fileName))

This works great running Spark locally... However when I run my jar on Dataproc, I get an error like below:

Exception in thread "main" java.lang.IllegalArgumentException: Wrong bucket: prj-***, in path: gs://prj-*****/part*, expected bucket: dataproc-temp-***

It seems files may not be saved to target buckets until the end of the job, and therefore struggling to rename them. I have tried using the .option("mapreduce.fileoutputcommitter.algorithm.version", "2") as I read something about this that looked promising.

Update: Still no luck. It seems that spark.sparkContext.hadoopConfiguration expects the base bucket to be a dataproc-temp-* bucket. Full stack trace below:

Exception in thread "main" java.lang.IllegalArgumentException: Wrong bucket: prj-**, in path: gs://p**, expected bucket: dataproc-temp-u***
    at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem.checkPath(GoogleHadoopFileSystem.java:95)
    at org.apache.hadoop.fs.FileSystem.makeQualified(FileSystem.java:667)
    at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.makeQualified(GoogleHadoopFileSystemBase.java:394)
    at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem.getGcsPath(GoogleHadoopFileSystem.java:149)
    at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.globStatus(GoogleHadoopFileSystemBase.java:1085)
    at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.globStatus(GoogleHadoopFileSystemBase.java:1059)
1

There are 1 best solutions below

0
On

HCFS instance returned by FileSystem.get(...) call is tied to the specific FS (in this case GCS bucket). By default Dataproc Serverless Spark configured to use gs://daptaproc-temp-*/ bucket as a default HCFS via spark.hadoop.fs.defaultFS Spark property.

To solve this issue you need to create HCFS instance using FileSystem#get(URI uri, Configuration conf) call:

val fs = FileSystem.get(path.toUri, spark.sparkContext.hadoopConfiguration)