from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.sql.functions import monotonically_increasing_id, col, lit, udf
from pyspark.ml.linalg import Vectors, VectorUDT
from sklearn.datasets import load_wine
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
import numpy as np
import math
# Initialize Spark session
spark = SparkSession.builder \
.appName("CombinedCode") \
.getOrCreate()
# Load Wine dataset from sklearn
wine_sklearn = load_wine()
# Convert sklearn dataset to pandas DataFrame
wine_data = pd.DataFrame(data=wine_sklearn.data, columns=wine_sklearn.feature_names)
wine_data['label'] = wine_sklearn.target
# Convert pandas DataFrame to Spark DataFrame
data = spark.createDataFrame(wine_data)
# Perform data preprocessing
feature_cols = [col for col in data.columns if col != 'label']
vector_assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
data = vector_assembler.transform(data)
# Define missing functions (equal_frequency_binning and calculate_gini_optimized)
def equal_frequency_binning(values):
values = sorted(values)
distinct_values = list(set(values))
counts = {val: values.count(val) for val in distinct_values}
bins = int(math.sqrt(len(distinct_values)))
S = sum(counts.values()) / bins if bins != 0 else 0 # Avoid division by zero
split_points = []
current_bin = []
for val in values:
current_bin.append(val)
if counts[val] >= S:
split_points.append((current_bin[-1] + current_bin[-2]) / 2)
current_bin = []
bins -= 1
if bins == 1:
break
return split_points
def calculate_gini_optimized(data, split_val):
left_count = data.filter(data["label"] < split_val).count()
right_count = data.filter(data["label"] >= split_val).count()
total_count = left_count + right_count
left_gini = 1 - sum([(data.filter(data["label"] == label).count() / left_count) ** 2 for label in range(int(split_val))])
right_gini = 1 - sum([(data.filter(data["label"] == label).count() / right_count) ** 2 for label in range(int(split_val), data.select("label").distinct().count())])
gini = (left_count / total_count) * left_gini + (right_count / total_count) * right_gini
return gini
# Add an index column to the training data
data = data.withColumn("index", monotonically_increasing_id())
# Generate Forest Sampling Index (FSI) table
fsi_table = data.select("index", "features").distinct()
# Split data into training and testing sets
train_data, test_data = data.randomSplit([0.8, 0.2], seed=42)
# Train Random Forest Classifier
rf = RandomForestClassifier(featuresCol="features", labelCol="label")
# Define a function to train decision tree
def train_decision_tree(i):
# Generate FSI table for each tree
fsi_table_tree = fsi_table.withColumnRenamed("features", f"features_tree_{i}")
train_data_tree = train_data.join(fsi_table_tree, on="index", how="inner")
# Calculate new Gini coefficient for each feature
gini_coefficients = {}
for feature_col in feature_cols:
gini_coefficients[feature_col] = calculate_gini_optimized(train_data_tree, 0.5)
# Perform equal-frequency binning for continuous features
num_bins = 10 # Number of bins
for feature_col in feature_cols:
feature_values = train_data_tree.select(feature_col).rdd.flatMap(lambda x: x).collect()
if len(set(feature_values)) > num_bins:
split_points = equal_frequency_binning(feature_values)
train_data_tree = train_data_tree.withColumn(feature_col, lit(split_points))
# Train decision tree on subset of data
print(f"Training decision tree {i+1}/{num_trees}...")
# Train Random Forest Classifier
rf = RandomForestClassifier(featuresCol="features", labelCol="label")
return rf.fit(train_data_tree)
# Train multiple decision trees in parallel
num_trees = 10 # Number of decision trees
#models = [train_decision_tree(i) for i in range(num_trees)]
# Parallelize model training process
models = spark.sparkContext.parallelize(range(num_trees)).mapPartitions(train_decision_tree).collect()
# Aggregate results from all decision trees
def aggregate_predictions(data, models):
predictions = None
for model in models:
if predictions is None:
predictions = model.transform(data)
else:
predictions = predictions.union(model.transform(data))
return predictions
# Aggregate predictions for test data
predictions = aggregate_predictions(test_data, models)
# Convert predictions to Pandas DataFrame for evaluation
predictions_df = predictions.select("label", "prediction").toPandas()
# Calculate accuracy
accuracy = accuracy_score(predictions_df["label"], predictions_df["prediction"])
print(f"Model Accuracy: {accuracy}")
# Stop Spark session
spark.stop()
above code gives below error:
PySparkRuntimeError Traceback (most recent call last)
File ~/anaconda3/lib/python3.11/site-packages/pyspark/serializers.py:459, in CloudPickleSerializer.dumps(self, obj)
458 try:
--> 459 return cloudpickle.dumps(obj, pickle_protocol)
460 except pickle.PickleError:
File ~/anaconda3/lib/python3.11/site-packages/pyspark/cloudpickle/cloudpickle_fast.py:73, in dumps(obj, protocol, buffer_callback)
70 cp = CloudPickler(
71 file, protocol=protocol, buffer_callback=buffer_callback
72 )
---> 73 cp.dump(obj)
74 return file.getvalue()
File ~/anaconda3/lib/python3.11/site-packages/pyspark/cloudpickle/cloudpickle_fast.py:632, in CloudPickler.dump(self, obj)
631 try:
--> 632 return Pickler.dump(self, obj)
633 except RuntimeError as e:
File ~/anaconda3/lib/python3.11/site-packages/pyspark/context.py:466, in SparkContext.__getnewargs__(self)
464 def __getnewargs__(self) -> NoReturn:
465 # This method is called when attempting to pickle SparkContext, which is always an error:
--> 466 raise PySparkRuntimeError(
467 error_class="CONTEXT_ONLY_VALID_ON_DRIVER",
468 message_parameters={},
469 )
PySparkRuntimeError: [CONTEXT_ONLY_VALID_ON_DRIVER] It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.
During handling of the above exception, another exception occurred:
PicklingError Traceback (most recent call last)
Cell In[19], line 106
103 num_trees = 10 # Number of decision trees
104 #models = [train_decision_tree(i) for i in range(num_trees)]
105 # Parallelize model training process
--> 106 models = spark.sparkContext.parallelize(range(num_trees)).mapPartitions(train_decision_tree).collect()
108 # Aggregate results from all decision trees
109 def aggregate_predictions(data, models):
File ~/anaconda3/lib/python3.11/site-packages/pyspark/rdd.py:1833, in RDD.collect(self)
1831 with SCCallSiteSync(self.context):
1832 assert self.ctx._jvm is not None
-> 1833 sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
1834 return list(_load_from_socket(sock_info, self._jrdd_deserializer))
File ~/anaconda3/lib/python3.11/site-packages/pyspark/rdd.py:5470, in PipelinedRDD._jrdd(self)
5467 else:
5468 profiler = None
-> 5470 wrapped_func = _wrap_function(
5471 self.ctx, self.func, self._prev_jrdd_deserializer, self._jrdd_deserializer, profiler
5472 )
5474 assert self.ctx._jvm is not None
5475 python_rdd = self.ctx._jvm.PythonRDD(
5476 self._prev_jrdd.rdd(), wrapped_func, self.preservesPartitioning, self.is_barrier
5477 )
File ~/anaconda3/lib/python3.11/site-packages/pyspark/rdd.py:5268, in _wrap_function(sc, func, deserializer, serializer, profiler)
5266 assert serializer, "serializer should not be empty"
5267 command = (func, profiler, deserializer, serializer)
-> 5268 pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command)
5269 assert sc._jvm is not None
5270 return sc._jvm.SimplePythonFunction(
5271 bytearray(pickled_command),
5272 env,
(...)
5277 sc._javaAccumulator,
5278 )
File ~/anaconda3/lib/python3.11/site-packages/pyspark/rdd.py:5251, in _prepare_for_python_RDD(sc, command)
5248 def _prepare_for_python_RDD(sc: "SparkContext", command: Any) -> Tuple[bytes, Any, Any, Any]:
5249 # the serialized command will be compressed by broadcast
5250 ser = CloudPickleSerializer()
-> 5251 pickled_command = ser.dumps(command)
5252 assert sc._jvm is not None
5253 if len(pickled_command) > sc._jvm.PythonUtils.getBroadcastThreshold(sc._jsc): # Default 1M
5254 # The broadcast will have same life cycle as created PythonRDD
File ~/anaconda3/lib/python3.11/site-packages/pyspark/serializers.py:469, in CloudPickleSerializer.dumps(self, obj)
467 msg = "Could not serialize object: %s: %s" % (e.__class__.__name__, emsg)
468 print_exec(sys.stderr)
--> 469 raise pickle.PicklingError(msg)
PicklingError: Could not serialize object: PySparkRuntimeError: [CONTEXT_ONLY_VALID_ON_DRIVER] It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.
Could you please let me know how can this be resolved? PicklingError: Could not serialize object: PySparkRuntimeError: [CONTEXT_ONLY_VALID_ON_DRIVER] It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.