How to ingest files from volume using autoloader on databricks

119 Views Asked by At

I am doing a test run. I am uploading files to a volume and then using autoloader to ingesgt files and creating a table. I am getting this error message:


com.databricks.sql.cloudfiles.errors.CloudFilesIllegalStateException: The container in the file event {"backfill":{"bucket":"root@dbstoragepdarecwn6h6go","key":"7019658555662308/FileStore/LiveDataUpload/wgs_hpo_test/2127020.HPO.txt","size":77,"eventTime":1703107647000}} is different from expected by the source: unitycatalog@bgdatabricksstoragev2.


Here is the code to ingest the file from the specified location.


import dlt

from pyspark.sql.functions import col, current_timestamp, split

# Define variables used in code below

file_path = "/Volumes/bgem_dev/wgs_live/hpo/"  

username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0]

table_name = f"bgem_dev.wgs_live.hpo_test"

checkpoint_path = f"/tmp/{username}/_checkpoint/Live"

# Clear out data from previous demo execution

#spark.sql(f"DROP TABLE IF EXISTS {table_name}")

#dbutils.fs.rm(checkpoint_path, True)

(spark.readStream

  .format("cloudFiles")

  .option("cloudFiles.format", "text")

  .load(file_path)

  .select("*", col("_metadata.file_path").alias("source_file"), current_timestamp().alias("processing_time"))

   .writeStream

  .option("checkpointLocation", checkpoint_path)

  .trigger(availableNow=True)

  .toTable(table_name)

  )
0

There are 0 best solutions below