I'm trying to load all incoming parquet files from an S3 Bucket, and process them with delta-lake. I'm getting an exception.
val df = spark.readStream().parquet("s3a://$bucketName/")
df.select("unit") //filter data!
.writeStream()
.format("delta")
.outputMode("append")
.option("checkpointLocation", checkpointFolder)
.start(bucketProcessed) //output goes in another bucket
.awaitTermination()
It throws an exception, because "unit" is ambiguous.
I've tried debugging it. For some reason, it finds "unit" twice.
What is going on here? Could it be an encoding issue?
edit: This is how I create the spark session:
val spark = SparkSession.builder()
.appName("streaming")
.master("local")
.config("spark.hadoop.fs.s3a.endpoint", endpoint)
.config("spark.hadoop.fs.s3a.access.key", accessKey)
.config("spark.hadoop.fs.s3a.secret.key", secretKey)
.config("spark.hadoop.fs.s3a.path.style.access", true)
.config("spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version", 2)
.config("spark.hadoop.mapreduce.fileoutputcommitter.cleanup-failures.ignored", true)
.config("spark.sql.caseSensitive", true)
.config("spark.sql.streaming.schemaInference", true)
.config("spark.sql.parquet.mergeSchema", true)
.orCreate
edit2: output from df.printSchema()
2020-10-21 13:15:33,962 [main] WARN org.apache.spark.sql.execution.datasources.DataSource - Found duplicate column(s) in the data schema and the partition schema: `unit`;
root
|-- unit: string (nullable = true)
|-- unit: string (nullable = true)
Reading the same data like this...
...solves the issue. For whatever reason. I would love to know why... :(