I'm trying to parallelize the training of multiple time-series using Spark on Azure Databricks.
Other than training, I would like to log metrics and models using MLflow.
The structure of the code is quite simple (basically adapted this example).
- A Databricks notebook triggers the MLflow Project
mlflow.run( uri="/dbfs/mlflow-project", parameters={"data_path": "dbfs:/data/", "experiment_name": "test"}, experiment_id=575501044793272, use_conda=False, backend="databricks", backend_config={ "new_cluster": { "spark_version": "9.1.x-cpu-ml-scala2.12", "num_workers": 8, "node_type_id": "Standard_DS4_v2", }, "libraries": [{"pypi": {"package": "pyarrow"}}] }, synchronous=False )
The main function is called. It basically executes three steps:
- Read the delta table indicated by the data_path provided
- Define a function which triggers the "train entry" of the MLflow project
- Apply this function as a Pandas UDF on the Spark DataFrame
Here the code:
sc = sparkContext('local')
spark = SparkSession(sc)
@click.argument("data_path")
@click.argument("experiment_name")
def run(data_path: str, experiment_name: str):
df = spark.read.format("delta").load(f"{data_path}")
result_schema = StructType([StructField("key", StringType())])
def forecast(data: pd.DataFrame) -> pd.DataFrame:
child_run = client.create_run(
experiment_id=experiment,
tags={MLFLOW_PARENT_RUN_ID: parent_run_id},
)
p = mlflow.projects.run(
run_id=child_run.info.run_id,
uri=".",
entry_points="train",
parameters={"data": data.to_json(), "run_id": child_run.info.run_id},
experiment_id=experiment,
backend="local",
usa_conda=False,
synchronous=False,
)
# Just a placeholder to use pandas UDF
out = pd.DataFrame(data={"key": ["1"]})
return out
client = MLflowClient()
experiment_path = f"/mlflow/experiments/{experiment_name}"
experiment = client.create_experiment(experiment_path)
parent_run = client.create_run(experiment_id=experiment)
parent_run_id = parent_run.run_id
# Apply pandas UDF (count() used just to avoid lazy evaluation)
df.groupBy("key").applyInPandas(forecast, result_schema).count()
- The train function is called on each key.
This basically trains a Prophet model for each time series (i.e. for each key), for which logs both parameters and model.
From cluster stderr and stdout I can see that pandas UDF is correctly applied, since it correctly divides the whole data based on "key" column, i.e. works one time series at a time.
The problem is that monitoring the cluster usage only one node is used, the driver node: work is not distributed on the available workers, despite pandas UDF appears to be applied correctly.
What might be the issue here? Could I provide some more details?
Thank you very much in advance, Matteo
Its seems you need to repartition input dataframe. otherwise spark will see a single partition dataframe and will process accordingly.