400 : Bad Request, py4j.protocol.Py4JJavaError: An error occurred while calling o44.save

1.1k Views Asked by At

I am able to connect to redshift using pyspark after some research and can read a table data into spark dataframe. Now, I am trying to insert that data frame into another redshift table(with same structure). Here is the code I am using to connect to s3/redshift and reading data from it in spark_df variable.

from pyspark.sql import SQLContext
from pyspark import SparkConf,  SparkContext

sc = SparkContext(conf=SparkConf().setAppName("MyApp").setMaster("local"))


hadoop_conf=sc._jsc.hadoopConfiguration()
hadoop_conf.set("fs.s3n.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
hadoop_conf.set("fs.s3n.awsAccessKeyId", "XXXXXXXXXXXXXXXX")
hadoop_conf.set("fs.s3n.awsSecretAccessKey", "XXXXXXXXXXXXXXXXX")

sql_context  = SQLContext(sc)
df = sql_context.read.json("events_20191101_000000000050_00.json")
print(df.count())
# Read data from a table
spark_df = sql_context.read \
    .format("com.databricks.spark.redshift") \
    .option("url", "jdbc:redshift://redshift-cluster-1.XXXXXXXXXXXXXXXXXXXXXXXXXXX") \
    .option("dbtable", "public.events_20190310") \
    .option("tempdir", "s3n://big-query-to-rs/rs_temp_data") \
    .load()

print("loaded into spark df")
print("spdf ", spark_df)
print(spark_df.count())
print("----------------------------------------------------")
#print(spark_df.first())
#print(spark_df.second())



# Write back to a table
spark_df.write \
  .format("com.databricks.spark.redshift") \
  .option("url", "jdbc:redshift://redshift-cluster-1.XXXXXXXXXXXXXXXXXXXXXXXXXXX") \
  .option("dbtable", "public.events_20000101") \
  .option("tempdir", "s3n://big-query-to-rs/rs_temp_data") \
  .mode("overwrite") \
  .save()

While inserting spark_df dataframe into redshift using spark_df.write it gives following error.

line 38, in <module>
    .mode("overwrite") \
  File "/home/ubuntu/spark-2.3.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 701, in save
  File "/home/ubuntu/spark-2.3.0-bin-hadoop2.7/python/lib/py4j-0.10.6-src.zip/py4j/java_gateway.py", line 1160, in __call__
  File "/home/ubuntu/spark-2.3.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
  File "/home/ubuntu/spark-2.3.0-bin-hadoop2.7/python/lib/py4j-0.10.6-src.zip/py4j/protocol.py", line 320, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o44.save.
: java.io.IOException: s3n://big-query-to-rs : 400 : Bad Request
    at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.processException(Jets3tNativeFileSystemStore.java:453)
    at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.processException(Jets3tNativeFileSystemStore.java:427)
    at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.handleException(Jets3tNativeFileSystemStore.java:411)
    at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.retrieveMetadata(Jets3tNativeFileSystemStore.java:181)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
    at org.apache.hadoop.fs.s3native.$Proxy20.retrieveMetadata(Unknown Source)
    at org.apache.hadoop.fs.s3native.NativeS3FileSystem.getFileStatus(NativeS3FileSystem.java:476)
    at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1426)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:92)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
    at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:654)
    at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:654)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:654)
    at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:273)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:267)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:225)
    at com.databricks.spark.redshift.RedshiftWriter.unloadData(RedshiftWriter.scala:278)
    at com.databricks.spark.redshift.RedshiftWriter.saveToRedshift(RedshiftWriter.scala:346)
    at com.databricks.spark.redshift.DefaultSource.createRelation(DefaultSource.scala:106)
    at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
    at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:654)
    at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:654)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:654)
    at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:273)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:267)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:214)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.jets3t.service.impl.rest.HttpException: 400 Bad Request
    at org.jets3t.service.impl.rest.httpclient.RestStorageService.performRequest(RestStorageService.java:425)
    at org.jets3t.service.impl.rest.httpclient.RestStorageService.performRequest(RestStorageService.java:279)
    at org.jets3t.service.impl.rest.httpclient.RestStorageService.performRestHead(RestStorageService.java:1052)
    at org.jets3t.service.impl.rest.httpclient.RestStorageService.getObjectImpl(RestStorageService.java:2264)
    at org.jets3t.service.impl.rest.httpclient.RestStorageService.getObjectDetailsImpl(RestStorageService.java:2193)
    at org.jets3t.service.StorageService.getObjectDetails(StorageService.java:1120)
    at org.jets3t.service.StorageService.getObjectDetails(StorageService.java:575)
    at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.retrieveMetadata(Jets3tNativeFileSystemStore.java:174)
    ... 60 more

2019-12-19 06:26:31 INFO  SparkContext:54 - Invoking stop() from shutdown hook
2019-12-19 06:26:31 INFO  AbstractConnector:318 - Stopped Spark@145d5bb9{HTTP/1.1,[http/1.1]}{0.0.0.0:4041}
2019-12-19 06:26:31 INFO  SparkUI:54 - Stopped Spark web UI at http://ip-172-30-1-193.ap-south-1.compute.internal:4041

I have tried different modes(append, overwrite,'ignore','error') and none of them inserting the loaded data into spark dataframe and giving respectives error.

Any lead is highly appreciated. Thanks a lot.

FYI: spark_df have around 5 million row of around 10gb of data.

0

There are 0 best solutions below