I want to create an Apache Spark DataFrame from a S3 resource. I've tried on AWS and on IBM S3 Clout Object Store, both fail with

org.apache.spark.util.TaskCompletionListenerException: Premature end of Content-Length delimited message body (expected: 2,250,236; received: 16,360)

I'm running pyspark with

./pyspark --packages com.amazonaws:aws-java-sdk-pom:1.11.828,org.apache.hadoop:hadoop-aws:2.7.0

I'm setting the S3 configuration for IBM with

sc._jsc.hadoopConfiguration().set("fs.s3a.access.key", "xx")
sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", "xx")
sc._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "s3.eu-de.cloud-object-storage.appdomain.cloud")

Or AWS with

sc._jsc.hadoopConfiguration().set("fs.s3a.access.key", "xx")
sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", " xx ")
sc._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "s3.us-west-2.amazonaws.com")

In both cases the following code: df=spark.read.csv("s3a://drill-test/cases.csv")

It fails with the exception

org.apache.spark.util.TaskCompletionListenerException: Premature end of Content-Length delimited message body (expected: 2,250,236; received: 16,360)
3

There are 3 best solutions below

0
On

First of all you should take a look at the exception it doesn't provide information

https://spark.apache.org/docs/1.2.2/api/java/org/apache/spark/util/TaskCompletionListenerException.html

There is one case that I can think of which is user permission error from s3 and IBM cloud both. Are you accessing the public link on s3 or is it a private link if it is then. You should dig deep on the link permissions.

1
On

This is probably very confusing for you.

The error below:

org.apache.spark.util.TaskCompletionListenerException: Premature end of Content-Length delimited message body (expected: 2,250,236; received: 16,360)

Is s3 telling you that you have an error in communication with s3. My guess is that you are on an older version of spark that does not know what the exception is and it attempts to bring the file back as the XML error message.

Please see the below updates that should help with your situation, by placing them above your read call and by filling in <aws_key>, <aws_secret>, and <aws_region>:

hadoop_conf = spark.sparkContext._jsc.hadoopConfiguration()
hadoop_conf.set("fs.s3a.awsAccessKeyId", "<aws_key>")
hadoop_conf.set("fs.s3a.awsSecretAccessKey", "<aws_secret>")
hadoop_conf.set("fs.s3a.impl","org.apache.hadoop.fs.s3a.S3AFileSystem")
hadoop_conf.set("fs.s3a.impl","org.apache.hadoop.fs.s3native.NativeS3FileSystem")
hadoop_conf.set("com.amazonaws.services.s3.enableV4", "true")
hadoop_conf.set("fs.s3a.aws.credentials.provider","org.apache.hadoop.fs.s3a.BasicAWSCredentialsProvider")
hadoop_conf.set("fs.s3a.endpoint", "<aws_region>.amazonaws.com")

Good luck!

1
On

Spent days trying to figure this out.. I had openjdk version "1.8.0_265" which was giving me that exact error when trying to read from IBM S3 COS. Changed my java version to openjdk version "1.8.0_272" and it worked.