spark parallelize fails with pickle error

35 Views Asked by At
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.sql.functions import monotonically_increasing_id, col, lit, udf
from pyspark.ml.linalg import Vectors, VectorUDT
from sklearn.datasets import load_wine
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
import numpy as np
import math

# Initialize Spark session
spark = SparkSession.builder \
    .appName("CombinedCode") \
    .getOrCreate()

# Load Wine dataset from sklearn
wine_sklearn = load_wine()

# Convert sklearn dataset to pandas DataFrame
wine_data = pd.DataFrame(data=wine_sklearn.data, columns=wine_sklearn.feature_names)
wine_data['label'] = wine_sklearn.target

# Convert pandas DataFrame to Spark DataFrame
data = spark.createDataFrame(wine_data)

# Perform data preprocessing
feature_cols = [col for col in data.columns if col != 'label']
vector_assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
data = vector_assembler.transform(data)

# Define missing functions (equal_frequency_binning and calculate_gini_optimized)
def equal_frequency_binning(values):
    values = sorted(values)
    distinct_values = list(set(values))
    counts = {val: values.count(val) for val in distinct_values}
    bins = int(math.sqrt(len(distinct_values)))
    S = sum(counts.values()) / bins if bins != 0 else 0  # Avoid division by zero
    split_points = []
    current_bin = []
    for val in values:
        current_bin.append(val)
        if counts[val] >= S:
            split_points.append((current_bin[-1] + current_bin[-2]) / 2)
            current_bin = []
            bins -= 1
            if bins == 1:
                break
    return split_points

def calculate_gini_optimized(data, split_val):
    left_count = data.filter(data["label"] < split_val).count()
    right_count = data.filter(data["label"] >= split_val).count()
    total_count = left_count + right_count
    
    left_gini = 1 - sum([(data.filter(data["label"] == label).count() / left_count) ** 2 for label in range(int(split_val))])
    right_gini = 1 - sum([(data.filter(data["label"] == label).count() / right_count) ** 2 for label in range(int(split_val), data.select("label").distinct().count())])
    
    gini = (left_count / total_count) * left_gini + (right_count / total_count) * right_gini
    
    return gini

# Add an index column to the training data
data = data.withColumn("index", monotonically_increasing_id())

# Generate Forest Sampling Index (FSI) table
fsi_table = data.select("index", "features").distinct()

# Split data into training and testing sets
train_data, test_data = data.randomSplit([0.8, 0.2], seed=42)

# Train Random Forest Classifier
rf = RandomForestClassifier(featuresCol="features", labelCol="label")

# Define a function to train decision tree
def train_decision_tree(i):
    # Generate FSI table for each tree
    fsi_table_tree = fsi_table.withColumnRenamed("features", f"features_tree_{i}")
    train_data_tree = train_data.join(fsi_table_tree, on="index", how="inner")
    
    # Calculate new Gini coefficient for each feature
    gini_coefficients = {}
    for feature_col in feature_cols:
        gini_coefficients[feature_col] = calculate_gini_optimized(train_data_tree, 0.5)
    
    # Perform equal-frequency binning for continuous features
    num_bins = 10  # Number of bins
    for feature_col in feature_cols:
        feature_values = train_data_tree.select(feature_col).rdd.flatMap(lambda x: x).collect()
        if len(set(feature_values)) > num_bins:
            split_points = equal_frequency_binning(feature_values)
            train_data_tree = train_data_tree.withColumn(feature_col, lit(split_points))
    
    # Train decision tree on subset of data
    print(f"Training decision tree {i+1}/{num_trees}...")
    # Train Random Forest Classifier
    rf = RandomForestClassifier(featuresCol="features", labelCol="label")

    return rf.fit(train_data_tree)

# Train multiple decision trees in parallel
num_trees = 10  # Number of decision trees
#models = [train_decision_tree(i) for i in range(num_trees)]
# Parallelize model training process
models = spark.sparkContext.parallelize(range(num_trees)).mapPartitions(train_decision_tree).collect()

# Aggregate results from all decision trees
def aggregate_predictions(data, models):
    predictions = None
    for model in models:
        if predictions is None:
            predictions = model.transform(data)
        else:
            predictions = predictions.union(model.transform(data))
    return predictions

# Aggregate predictions for test data
predictions = aggregate_predictions(test_data, models)

# Convert predictions to Pandas DataFrame for evaluation
predictions_df = predictions.select("label", "prediction").toPandas()

# Calculate accuracy
accuracy = accuracy_score(predictions_df["label"], predictions_df["prediction"])
print(f"Model Accuracy: {accuracy}")

# Stop Spark session
spark.stop()

above code gives below error:

PySparkRuntimeError                       Traceback (most recent call last)
File ~/anaconda3/lib/python3.11/site-packages/pyspark/serializers.py:459, in CloudPickleSerializer.dumps(self, obj)
    458 try:
--> 459     return cloudpickle.dumps(obj, pickle_protocol)
    460 except pickle.PickleError:

File ~/anaconda3/lib/python3.11/site-packages/pyspark/cloudpickle/cloudpickle_fast.py:73, in dumps(obj, protocol, buffer_callback)
     70 cp = CloudPickler(
     71     file, protocol=protocol, buffer_callback=buffer_callback
     72 )
---> 73 cp.dump(obj)
     74 return file.getvalue()

File ~/anaconda3/lib/python3.11/site-packages/pyspark/cloudpickle/cloudpickle_fast.py:632, in CloudPickler.dump(self, obj)
    631 try:
--> 632     return Pickler.dump(self, obj)
    633 except RuntimeError as e:

File ~/anaconda3/lib/python3.11/site-packages/pyspark/context.py:466, in SparkContext.__getnewargs__(self)
    464 def __getnewargs__(self) -> NoReturn:
    465     # This method is called when attempting to pickle SparkContext, which is always an error:
--> 466     raise PySparkRuntimeError(
    467         error_class="CONTEXT_ONLY_VALID_ON_DRIVER",
    468         message_parameters={},
    469     )

PySparkRuntimeError: [CONTEXT_ONLY_VALID_ON_DRIVER] It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.

During handling of the above exception, another exception occurred:

PicklingError                             Traceback (most recent call last)
Cell In[19], line 106
    103 num_trees = 10  # Number of decision trees
    104 #models = [train_decision_tree(i) for i in range(num_trees)]
    105 # Parallelize model training process
--> 106 models = spark.sparkContext.parallelize(range(num_trees)).mapPartitions(train_decision_tree).collect()
    108 # Aggregate results from all decision trees
    109 def aggregate_predictions(data, models):

File ~/anaconda3/lib/python3.11/site-packages/pyspark/rdd.py:1833, in RDD.collect(self)
   1831 with SCCallSiteSync(self.context):
   1832     assert self.ctx._jvm is not None
-> 1833     sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
   1834 return list(_load_from_socket(sock_info, self._jrdd_deserializer))

File ~/anaconda3/lib/python3.11/site-packages/pyspark/rdd.py:5470, in PipelinedRDD._jrdd(self)
   5467 else:
   5468     profiler = None
-> 5470 wrapped_func = _wrap_function(
   5471     self.ctx, self.func, self._prev_jrdd_deserializer, self._jrdd_deserializer, profiler
   5472 )
   5474 assert self.ctx._jvm is not None
   5475 python_rdd = self.ctx._jvm.PythonRDD(
   5476     self._prev_jrdd.rdd(), wrapped_func, self.preservesPartitioning, self.is_barrier
   5477 )

File ~/anaconda3/lib/python3.11/site-packages/pyspark/rdd.py:5268, in _wrap_function(sc, func, deserializer, serializer, profiler)
   5266 assert serializer, "serializer should not be empty"
   5267 command = (func, profiler, deserializer, serializer)
-> 5268 pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command)
   5269 assert sc._jvm is not None
   5270 return sc._jvm.SimplePythonFunction(
   5271     bytearray(pickled_command),
   5272     env,
   (...)
   5277     sc._javaAccumulator,
   5278 )

File ~/anaconda3/lib/python3.11/site-packages/pyspark/rdd.py:5251, in _prepare_for_python_RDD(sc, command)
   5248 def _prepare_for_python_RDD(sc: "SparkContext", command: Any) -> Tuple[bytes, Any, Any, Any]:
   5249     # the serialized command will be compressed by broadcast
   5250     ser = CloudPickleSerializer()
-> 5251     pickled_command = ser.dumps(command)
   5252     assert sc._jvm is not None
   5253     if len(pickled_command) > sc._jvm.PythonUtils.getBroadcastThreshold(sc._jsc):  # Default 1M
   5254         # The broadcast will have same life cycle as created PythonRDD

File ~/anaconda3/lib/python3.11/site-packages/pyspark/serializers.py:469, in CloudPickleSerializer.dumps(self, obj)
    467     msg = "Could not serialize object: %s: %s" % (e.__class__.__name__, emsg)
    468 print_exec(sys.stderr)
--> 469 raise pickle.PicklingError(msg)

PicklingError: Could not serialize object: PySparkRuntimeError: [CONTEXT_ONLY_VALID_ON_DRIVER] It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.

​Could you please let me know how can this be resolved? PicklingError: Could not serialize object: PySparkRuntimeError: [CONTEXT_ONLY_VALID_ON_DRIVER] It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.

0

There are 0 best solutions below