Write a PySpark DataFrame to DigitalOcean Spaces results in a Forbidden 403 error

46 Views Asked by At

Attempting to write a PySpark DataFrame to DigitalOcean Spaces results in a "Forbidden (403)" error.. When using the provided PySpark function get_spark() and attempting to write a DataFrame to DigitalOcean Spaces using the function test_spaces(), a "Forbidden (403)" error is encountered. The error stack trace suggests an issue with access permissions. But using the python boto client the I can access spaces with these key/secret. Currently using PySpark 3.5 and I'm using the configuration for the jar files from here.

def get_spark() -> SparkSession:
    """Provides a well configured spark session"""
    return (
        SparkSession.builder.master("local[*]")
        .appName("test")
        .config("spark.jars.packages", "io.delta:delta-spark_2.12:3.0.0")
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
        .config(
            "spark.jars.packages",
            "io.delta:delta-spark_2.12:3.0.0,org.apache.hadoop:hadoop-aws:3.3.4,com.amazonaws:aws-java-sdk-bundle:1.12.262",
        )
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
        .config("spark.hadoop.fs.s3a.access.key", "KEY")
        .config("spark.hadoop.fs.s3a.secret.key", "SECRET")
        .config("spark.haddop.fs.s3a.endpoint", "https://REGION.digitaloceanspaces.com")
        .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
        .config("spark.sql.warehouse.dir", WAREHOUSE_LOCATION)
        .enableHiveSupport()
        .getOrCreate()
    )


def test_spaces():
    """Creates the NHL roster table in the bronze layer"""
    spark = get_spark()
    # Create a simple DataFrame
    data = [("John", 25), ("Alice", 30), ("Bob", 28)]
    columns = ["Name", "Age"]
    df = spark.createDataFrame(data, columns)

    # Show the DataFrame
    df.show()

    # Write DataFrame to DigitalOcean Spaces
    df.write.json(f"s3a://bucket_name/test")

Stacktrace:

py4j.protocol.Py4JJavaError: An error occurred while calling o65.json.
: java.nio.file.AccessDeniedException: s3a://PATH: getFileStatus on s3a://PATH: com.amazonaws.services.s3.model.AmazonS3Exception: Forbidden (Service: Amazon S3; Status Code: 403; Error Code: 403 Forbidden; Request ID: 069ZFJ7PEE4SDT1B; S3 Extended Request ID: iExUnbCSQn8Tued6vyOcmZvu7BMm/6NVRWtopsmAHk572kPJxY5lV8C4BSkalexpg/18EgWnpAkpf2bTUElcxQ==; Proxy: null), S3 Extended Request ID: iExUnbCSQn8Tued6vyOcmZvu7BMm/6NVRWtopsmAHk572kPJxY5lV8C4BSkalexpg/18EgWnpAkpf2bTUElcxQ==:403 Forbidden
        at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:255)
        at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:175)
        at org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:3796)
        at org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:3688)
        at org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$exists$34(S3AFileSystem.java:4703)
        at org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.lambda$trackDurationOfOperation$5(IOStatisticsBinding.java:499)
        at org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration(IOStatisticsBinding.java:444)
        at org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2337)
        at org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2356)
        at org.apache.hadoop.fs.s3a.S3AFileSystem.exists(S3AFileSystem.java:4701)
        at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:120)
        at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:113)
        at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:111)
        at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:125)
        at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:107)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
        at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:107)
        at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
        at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
        at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437)
        at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:98)
        at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:85)
        at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:83)
        at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:142)
        at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:859)
        at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:388)
        at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:361)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:240)
        at org.apache.spark.sql.DataFrameWriter.json(DataFrameWriter.scala:774)
2

There are 2 best solutions below

0
nerdizzle On BEST ANSWER

It was a mean typo in .config("spark.haddop.fs.s3a.endpoint", "https://REGION.digitaloceanspaces.com"). After changing hddop to hadoopeverything worked as expected.

0
stevel On
  1. Cut the fs.s3a.impl line. That's a superstition passed down stack overflow posts, one copy and paste at a time.
  2. The fact the error includes an extended request ID S3 Extended Request ID: iExUnbCSQn8Tued6vyOcmZvu7BMm/6NVRWtopsmAHk572kPJxY5lV8C4BSkalexpg/18EgWnpAkpf2bTUElcxQ==; Proxy: null) hints that the request has gone to aws s3, not digitaloceanspaces unless they also include them (never known a third party which does...)

make sure those s3a binding settings are getting through to your code and the s3a connector...I'll leave that as an exercise in filesystem api and debugging as they're essential skills and, as it's a config issue on your side, nothing anyone else can do