Reference 'unit' is ambiguous, could be: unit, unit

545 Views Asked by At

I'm trying to load all incoming parquet files from an S3 Bucket, and process them with delta-lake. I'm getting an exception.

val df = spark.readStream().parquet("s3a://$bucketName/")

df.select("unit") //filter data!
        .writeStream()
        .format("delta")
        .outputMode("append")
        .option("checkpointLocation", checkpointFolder)
        .start(bucketProcessed) //output goes in another bucket
        .awaitTermination()

It throws an exception, because "unit" is ambiguous. exception

I've tried debugging it. For some reason, it finds "unit" twice.

debugging

What is going on here? Could it be an encoding issue?

edit: This is how I create the spark session:

val spark = SparkSession.builder()
    .appName("streaming")
    .master("local")
    .config("spark.hadoop.fs.s3a.endpoint", endpoint)
    .config("spark.hadoop.fs.s3a.access.key", accessKey)
    .config("spark.hadoop.fs.s3a.secret.key", secretKey)
    .config("spark.hadoop.fs.s3a.path.style.access", true)
    .config("spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version", 2)
    .config("spark.hadoop.mapreduce.fileoutputcommitter.cleanup-failures.ignored", true)
    .config("spark.sql.caseSensitive", true)
    .config("spark.sql.streaming.schemaInference", true)
    .config("spark.sql.parquet.mergeSchema", true)
    .orCreate

edit2: output from df.printSchema()

2020-10-21 13:15:33,962 [main] WARN  org.apache.spark.sql.execution.datasources.DataSource -  Found duplicate column(s) in the data schema and the partition schema: `unit`;
root
 |-- unit: string (nullable = true)
 |-- unit: string (nullable = true)
1

There are 1 best solutions below

0
On BEST ANSWER

Reading the same data like this...

val df = spark.readStream().parquet("s3a://$bucketName/*")

...solves the issue. For whatever reason. I would love to know why... :(