I have a pipeline model (random forest) logged using mlflow:
stages = feature_stage_list \
+ label_stage_list \
+ assembler_stage_list \
+ classifier_stage_list \
+ label_converter_stage_list
pipeline = Pipeline(stages = stages)
rf_model = pipeline.fit(input_df)
mlflow.spark.log_model(spark_model = rf_model, artifact_path = 'model', registered_model_name = registered_model_name)
the model is also saved on HDFS using:
rf_model.save(output_path)
now when I try to load the same model elsewhere so it could be tested on new data using mlflow.spark.load_model
like:
rf_model = mlflow.spark.load_model(input_path)
the following issue rises:
Caused by: java.lang.AssertionError: assertion failed: Decision Tree load failed. Expected largest node ID to be 45, but found 22
at scala.Predef$.assert(Predef.scala:223)
at org.apache.spark.ml.tree.DecisionTreeModelReadWrite$.buildTreeFromNodes(treeModels.scala:436)
at org.apache.spark.ml.tree.EnsembleModelReadWrite$.$anonfun$loadImpl$6(treeModels.scala:547)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
at org.apache.spark.util.random.SamplingUtils$.reservoirSampleAndCount(SamplingUtils.scala:42)
at org.apache.spark.RangePartitioner$.$anonfun$sketch$1(Partitioner.scala:322)
at org.apache.spark.RangePartitioner$.$anonfun$sketch$1$adapted(Partitioner.scala:320)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2(RDD.scala:905)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2$adapted(RDD.scala:905)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
at org.apache.spark.scheduler.Task.run(Task.scala:139)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more
I have to mention that loading the same model using the following works just fine:
rf_model = PipelineModel.load(input_path)
NOTE: Same machine/env is used to save and load models so it's not like I'm trying to load a model generated by a different spark version.
NOTE: The mlflow load_model procedure works fine when used to load other models such as logistic regression