Parallelize MLflow Project runs with Pandas UDF on Azure Databricks Spark

795 Views Asked by At

I'm trying to parallelize the training of multiple time-series using Spark on Azure Databricks.
Other than training, I would like to log metrics and models using MLflow.

The structure of the code is quite simple (basically adapted this example).

  1. A Databricks notebook triggers the MLflow Project
mlflow.run(
    uri="/dbfs/mlflow-project",
    parameters={"data_path": "dbfs:/data/", "experiment_name": "test"}, 
    experiment_id=575501044793272,
    use_conda=False,
    backend="databricks",
    backend_config={
        "new_cluster": {
            "spark_version": "9.1.x-cpu-ml-scala2.12",
            "num_workers": 8,
            "node_type_id": "Standard_DS4_v2",
        },
        "libraries": [{"pypi": {"package": "pyarrow"}}]
    },
    synchronous=False
)
  1. The main function is called. It basically executes three steps:

    1. Read the delta table indicated by the data_path provided
    2. Define a function which triggers the "train entry" of the MLflow project
    3. Apply this function as a Pandas UDF on the Spark DataFrame

Here the code:

sc = sparkContext('local')
spark = SparkSession(sc)

@click.argument("data_path")
@click.argument("experiment_name")
def run(data_path: str, experiment_name: str):
            
    df = spark.read.format("delta").load(f"{data_path}")
    result_schema = StructType([StructField("key", StringType())])

    def forecast(data: pd.DataFrame) -> pd.DataFrame:
        child_run = client.create_run(
            experiment_id=experiment,
            tags={MLFLOW_PARENT_RUN_ID: parent_run_id},
        )
        p = mlflow.projects.run(
            run_id=child_run.info.run_id, 
            uri=".",
            entry_points="train",
            parameters={"data": data.to_json(), "run_id": child_run.info.run_id}, 
            experiment_id=experiment,
            backend="local",
            usa_conda=False,
            synchronous=False,
        )

        # Just a placeholder to use pandas UDF
        out = pd.DataFrame(data={"key": ["1"]})
        return out

    client = MLflowClient()
    experiment_path = f"/mlflow/experiments/{experiment_name}"
    experiment = client.create_experiment(experiment_path)

    parent_run = client.create_run(experiment_id=experiment)
    parent_run_id = parent_run.run_id

    # Apply pandas UDF (count() used just to avoid lazy evaluation)
    df.groupBy("key").applyInPandas(forecast, result_schema).count()
  1. The train function is called on each key.
    This basically trains a Prophet model for each time series (i.e. for each key), for which logs both parameters and model.

From cluster stderr and stdout I can see that pandas UDF is correctly applied, since it correctly divides the whole data based on "key" column, i.e. works one time series at a time.

The problem is that monitoring the cluster usage only one node is used, the driver node: work is not distributed on the available workers, despite pandas UDF appears to be applied correctly.

What might be the issue here? Could I provide some more details?

Thank you very much in advance, Matteo

2

There are 2 best solutions below

1
On

Its seems you need to repartition input dataframe. otherwise spark will see a single partition dataframe and will process accordingly.

0
On

In the code, the spark-config defines the 'local' as the master node, so it is limited in terms of partitions, which is the count of cores on CPU at most. This downgrades the benefit of using parallelization and thus the speed. In order to full benefit of parallelization with speed improvement, 'yarn' or similar multi-node config should be called and the input dataframe should be repartitioned accordingly. In the code, each 'click' decorator will be processed by the driver as single-job graph.

A similar thread of question was taken up at

Why is Pandas UDF not being parallelized?