I have a pyspark DataFrame inside of a Databricks environment, which I'm trying to write to a specific schema in function of which target environment I'm using with Databricks Asset Bundles. The cluster is unity catalog enabled.

My bundle.yml looks like this:

bundle:
  name: XXX

workspace:
  host: XXX

targets:
  dev:
    mode: development
    default: true

resources:
  jobs:
    YYY:
      name: my_job
      job_clusters:
        - job_cluster_key: my_cluster
          new_cluster:
            spark_version: 13.3.x-scala2.12
            node_type_id: Standard_D3_v2
      tasks:
        - task_key: my_task
          job_cluster_key: my_cluster
          spark_python_task:
            python_file: do_task.py

And my code looks like this:

spark = SparkSession.builder.appName("my_application").getOrCreate()

schema_string = dev_username_bronze | prod_bronze

df = spark.read.parquet("/my/nice/location")
df.write.saveAsTable(f"my_catalog_name.{schema_string}.my_table")

I would be running my bundle using databricks bundle run -t dev (using Databricks CLI v0.206.0)

Now, I wrote some pseudo code to define schema_string but the idea is there: how can I use the value I supplied using the -t flag inside of my Python code, so that I can define my schema_string value properly?

1

There are 1 best solutions below

3
On BEST ANSWER

For spark_python_task, you can pass parameter by using parameters

reference: https://docs.databricks.com/en/workflows/jobs/jobs-2.0-api.html#jobssparkpythontask

in you yaml file

        - task_key: spark_python_task
          existing_cluster_id: xxxxxxxxxx
          spark_python_task:
            python_file: ../src/test.py
            parameters: [value1,value2,value3,value4,"${bundle.environment}"]

test.py

import sys

print(len(sys.argv))
print(str(sys.argv))

output:

6
['/Workspace/Users/[email protected]/.bundle/xxx/dev/files/src/test.py', 'value1', 'value2', 'value3', 'value4', 'dev']