I currently work on a databricks cluster trying to log an ALS model within a mlflow run. Trying multiple different approaches I either get a TypeError "cannot pickle '_thread.RLock' object" stopping my run or an OSError "No such file or directory: '/tmp/tmpxiznhskj/sparkml'" not stopping my run but I am not able to load that model back into my code.
Here is the prep code to play around a little bit:
import mlflow
import logging
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark import SparkContext, SparkConf
data = [{"User": 1, "Item": 1, "Rating": 1},
{"User": 2, "Item": 2, "Rating": 3},
{"User": 3, "Item": 3, "Rating": 1},
{"User": 4, "Item": 2, "Rating": 4},
{"User": 1, "Item": 2, "Rating": 3},
{"User": 2, "Item": 3, "Rating": 2},
{"User": 2, "Item": 4, "Rating": 1},
{"User": 4, "Item": 1, "Rating": 5}
]
conf = SparkConf().setAppName("ALS-mlflow-test")
sc = SparkContext.getOrCreate(conf)
rdd = sc.parallelize(data)
df_rating = rdd.toDF()
(df_train, df_test) = df_rating.randomSplit([0.8, 0.2])
logging.getLogger("mlflow").setLevel(logging.DEBUG)
- Using this as a base I tried different approaches, starting with the mlflow.sklearn.log_model method:
with mlflow.start_run() as run:
model_als = ALS(maxIter=5, regParam=0.01, userCol="User", itemCol="Item", ratingCol="Rating", implicitPrefs=False,
coldStartStrategy="drop")
model_als.fit(df_train)
mlflow.sklearn.log_model(model_als, artifact_path="test")
which results in the following error:
_SklearnCustomModelPicklingError: Pickling custom sklearn model ALS failed when saving model: cannot pickle '_thread.RLock' object
- Next I tried using a WrapperModel for my ALS model (the Wrapper does not actually do something yet):
class MyModel(mlflow.pyfunc.PythonModel):
def __init__(self, model):
self.model = model
def predict(self, context, model_input):
return self.my_custom_function(model_input)
def my_custom_function(self, model_input):
return 0
with mlflow.start_run():
model_als = ALS(maxIter=5, regParam=0.01, userCol="User", itemCol="Item", ratingCol="Rating", implicitPrefs=False,
coldStartStrategy="drop")
my_model = MyModel(model_als)
model_info = mlflow.pyfunc.log_model(artifact_path="model", python_model=my_model)
resulting in a more general but basically the same error as in step 1:
TypeError: cannot pickle '_thread.RLock' object
- I then tried putting my model within a pipeline (again nothing fancy just plain and simple for a test):
from pyspark.ml import Pipeline
with mlflow.start_run() as run:
model_als = ALS(maxIter=5, regParam=0.01, userCol="User", itemCol="Item", ratingCol="Rating", implicitPrefs=False,
coldStartStrategy="drop")
pipeline = Pipeline(stages=[model_als])
pipeline_model = pipeline.fit(df_train)
mlflow.spark.log_model(pipeline_model, artifact_path="test-pipeline")
This time the code executed but having a look at the debug log there also went something wrong:
stderr: Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
2023/01/05 08:54:22 INFO mlflow.spark: File '/tmp/tmpxiznhskj/sparkml' not found on DFS. Will attempt to upload the file.
Traceback (most recent call last):
File "/databricks/python/lib/python3.9/site-packages/mlflow/utils/_capture_modules.py", line 162, in <module>
main()
File "/databricks/python/lib/python3.9/site-packages/mlflow/utils/_capture_modules.py", line 137, in main
mlflow.pyfunc.load_model(model_path)
File "/databricks/python/lib/python3.9/site-packages/mlflow/pyfunc/__init__.py", line 484, in load_model
model_impl = importlib.import_module(conf[MAIN])._load_pyfunc(data_path)
File "/databricks/python/lib/python3.9/site-packages/mlflow/utils/_capture_modules.py", line 134, in _load_pyfunc_patch
return original(*args, **kwargs)
File "/databricks/python/lib/python3.9/site-packages/mlflow/spark.py", line 832, in _load_pyfunc
return _PyFuncModelWrapper(spark, _load_model(model_uri=path))
File "/databricks/python/lib/python3.9/site-packages/mlflow/spark.py", line 727, in _load_model
model_uri = _HadoopFileSystem.maybe_copy_from_uri(model_uri, dfs_tmpdir)
File "/databricks/python/lib/python3.9/site-packages/mlflow/spark.py", line 404, in maybe_copy_from_uri
return cls.maybe_copy_from_local_file(_download_artifact_from_uri(src_uri), dst_path)
File "/databricks/python/lib/python3.9/site-packages/mlflow/tracking/artifact_utils.py", line 100, in _download_artifact_from_uri
return get_artifact_repository(artifact_uri=root_uri).download_artifacts(
File "/databricks/python/lib/python3.9/site-packages/mlflow/store/artifact/local_artifact_repo.py", line 79, in download_artifacts
raise IOError("No such file or directory: '{}'".format(local_artifact_path))
OSError: No such file or directory: '/tmp/tmpxiznhskj/sparkml'
- Nevertheless, I tried loading the model, since the code ran through:
from pyspark.ml import PipelineModel
logged_model = 'runs:/xyz123/test'
# Load model
loaded_model = PipelineModel.load(logged_model)
results in the error:
org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme "runs"
- I then tried the auto generated code from databricks:
import mlflow
logged_model = 'runs:/xyz123/test'
# Load model
loaded_model = mlflow.spark.load_model(logged_model)
# Perform inference via model.transform()
loaded_model.transform(data)
results in the following error:
AttributeError: 'list' object has no attribute '_jdf'
- Last piece of code would be (but was not able to get to it yet, since I can't load the model):
evaluator = RegressionEvaluator(metricName="rmse", labelCol="Rating", predictionCol="prediction")
df_pred = loaded_model.transform(df_test)
rmse = evaluator.evaluate(df_pred)
df_pred.display()
print("Root-mean-square error explicit = " + str(rmse))
user_recs = loaded_model.recommendForAllUsers(2)
user_recs.display()
I conclusion what I am trying to achieve is simply logging the provided ALS model within my mlflow run. I ran out of ideas what could be wrong or what else I could try
Thanks in advance!
As of MLFlow 2.1.1
pyspark.ml.recommendation.ALS
is not on the allowlist. Using the pipeline as you tested in #3 is the appropriate way to log unsupported spark.ml models.It looks like you might have hit an environment issue when logging your model, as I was able to get the following implementation working both on Databricks and locally.
Log within run
Reload for inference
On Databricks you need to first move the model from the experiment into the model registry. This can be done via the UI or with the following commands.
Log directly to registry
Alternatively, you can also log directly to the model registry.
Logging as custom pyfunc
I also tried wrapping the ALS model and the pipeline in a custom pyfunc and in both cases I received the same exact error. I believe there is something unserializable with the ALS model that prevents this...