Creating a table in Pyspark within a Delta Live Table job in Databricks

2.5k Views Asked by At

I am running a DLT (Delta Live Table) Job that creates a Bronze table > Silver Table for two separate tables. So in the end, I have two separate gold Tables which I want to be merged into one table. I know how to do it in SQL but every time I run the job with an SQL cell in a Databricks notebook on Databricks it gives me this error.

Magic commands (e.g. %py, %sql and %run) are not supported with the exception of
%pip within a Python notebook. Cells containing magic commands are ignored.
Unsupported magic commands were found in the following notebooks

I would do it in PySpark but it does not have creat table functionalities.

Here is my code for making the bronze table

@dlt.table(name="Bronze_or",
                  comment = "New online retail sales data incrementally ingested from cloud object storage landing zone",
  table_properties={
    "quality": "bronze",
    "pipelines.cdc.tombstoneGCThresholdInSeconds": "2" #reducing the threshold to 2 instead of 5
  }
)
def Bronze_or():
  return (
    spark.readStream.format("cloudFiles") \
      .option("cloudFiles.format", "csv") \
      .option("cloudFiles.inferColumnTypes", "true") \
      .load("/**Path to raw csv data**/")
  )

Then I create a view

expect_list = {"not_null_pk": "id IS NOT NULL", "QuantityNotNeg": "Quantity >= 0"}

@dlt.view(name="Bronze_or_clean_v",
  comment="Cleansed bronze retail sales view (i.e. what will become Silver)")

# @dlt.expect("EMPLOYEE_ID", "EMPLOYEE_ID IS NOT NULL")
@dlt.expect_all(expect_list) 
# @dlt.expect("valid_address", "address IS NOT NULL")
# @dlt.expect_or_drop("valid_operation", "operation IS NOT NULL")

def Bronze_or_clean_v():
  return dlt.read_stream("Bronze_or") \
  .withColumn('inputFileName',F.input_file_name()) \
  .withColumn('LoadDate',F.lit(datetime.now()))

Finally, I create the silver table

dlt.create_target_table(name="Silver_or",
  comment="Clean, merged retail sales data",
  table_properties={
    "quality": "silver",
    "pipelines.cdc.tombstoneGCThresholdInSeconds": "2"
  }
)

Lastly, I build the gold table

@dlt.table(name="Gold_or")
def Gold_or():
  return (
    dlt.read("Silver_or")
#       .filter(expr("current_page_title == 'Apache_Spark'"))
#       .withColumnRenamed("previous_page_title", "referrer")
      .sort(desc("LIVE.Silver_or.CustomerID"))
      .select("LIVE.Silver_or.UnitPrice", "LIVE.Silver_or.Quantity", "LIVE.Silver_or.CustomerID")
      #.limit(10)
  )

I run this Twice for two different CSV files, So in the end, I have two separate Gold tables but I want to combine them into one with select columns.

FYI: Both tables share a foreign key.

1

There are 1 best solutions below

2
On

Have you considered writing it in SQL notebook ? So that you can get rid of %sql magic commands and directly write in SQL.

See screenshot to see how to switch : enter image description here