I have a scenario in which I have two different buckets in which my data land as source data, and I want to read and process them into bronze data.
The data inside each of those buckets have totally different formats. How can achieve this? I tried setting up two autoloaders inside a single live table but it doesn't seem to work (note that my like table originally had a single autoloader and I added a second one).
This is a similar code to what I've used:
@dlt.create_table(
name=table_name,
comment="Bronz Data",
path=table_path,
partition_cols=["user_id", "event_date"],
schema=BRONZ_DATA_SCHEMA,
table_properties={"pipelines.reset.allowed": "false"},
)
def dlt_bronz_data():
df_source = (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.useIncrementalListing", True)
.load(source_bucket_one)
)
df_bronze = source_bucket_one2bronze(spark, df_source)
df_source_two = (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("delimiter", ";")
.option("header", True)
.option("cloudFiles.inferColumnTypes", True)
.option("cloudFiles.schemaLocation", sftp_schema_location)
timestamp TIMESTAMP, metric STRING, value DOUBLE")
.option("cloudFiles.useNotifications", True)
.option("cloudFiles.backfillInterval", "1 week")
.load(source_bucket_two)
)
df_bronze_two = source_bucket_two2bronze(spark, JOB_NAME, df_source_two)
df_bronze = df_raw.union(df_raw_two)
return df_bronze