How to access spark context or pandas inside a worker node to create a dataframe?

47 Views Asked by At

For example,

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, concat, lit

# Create a Spark session
spark = SparkSession.builder.appName("BroadcastDataFrameExample").getOrCreate()

# Create a DataFrame with filter conditions
data = [('o_totalprice < 0', 'Expression')]

# Create a DataFrame with the specified columns and values
conditions_df = spark.createDataFrame(data, ["ConditionExpression", "ConditionType"])

# Dummy data
data = [
    (13710944, 227285, 'O', 162169.66, '1995-10-11', '1-URGENT', 'Clerk#000000432', 0, "accounts. ruthless instructions wake blithely"),
    (13710945, 225010, 'O', -302312.54, '1997-09-29', '5-LOW', 'Clerk#000002337', 0, "ironic platelets nag slyly"),
    (13710946, 238820, 'O', 179947.16, '1997-10-31', '2-HIGH', 'Clerk#000004135', 0, "ole requests. regular, regular instructi"),
    (13710947, 581233, 'O', 33843.49, '1995-05-25', '2-HIGH', 'Clerk#000000138', 0, "arefully final plates. slyly bold ide"),
]

# Create a DataFrame with the specified columns and values
dummy_df = spark.createDataFrame(data, [
    "o_orderkey", "o_custkey", "o_orderstatus", "o_totalprice", 
    "o_orderdate", "o_orderpriority", "o_clerk", "o_shippriority", "o_comment"
])


# Display the original DataFrame
print("Original DataFrame:")
conditions_df.show()

# Create a Pandas DataFrame from the original DataFrame
pandas_df = conditions_df.toPandas()

# Display the Pandas DataFrame
print("Pandas DataFrame:")
print(pandas_df)

# Broadcast the Pandas DataFrame
broadcasted_df = spark.sparkContext.broadcast(pandas_df)

# Function to apply filter conditions using the broadcasted DataFrame
def apply_conditions(iterator, broadcasted_df):

    pandas_df = broadcasted_df.value

    # Convert the partition iterator to a Pandas DataFrame
    # partition_df = pd.DataFrame(iterator, columns=df2.columns)
    partition_df = spark.createDataFrame(iterator, schema=df2.schema)

    filtered_rows = []

    for index, condition_row in pandas_df.iterrows():
        condition_type = condition_row["ConditionType"]
        condition_expression = condition_row["ConditionExpression"]

        # Apply condition_expression to the entire partition_df
        if condition_type == "Expression":
            condition_result = partition_df.filter(condition_expression)
            filtered_rows.extend(condition_result.to_dict(orient='records'))

    yield filtered_rows

# Apply filter conditions using mapPartitions
filtered_rdd = dummy_df.rdd.mapPartitions(lambda iterator: apply_conditions(iterator, broadcasted_df))

# Convert the filtered RDD to a DataFrame
filtered_df = spark.createDataFrame(filtered_rdd.flatMap(lambda x: x), schema=df2.schema)

# Display the filtered DataFrame
print("Filtered DataFrame:")
filtered_df.show()

The main idea is to use the filter conditions specified in the broadcasted Pandas DataFrame to filter the dummy_df DataFrame based on the condition type "Expression". The filter conditions are applied using mapPartitions, which operates on each partition of the DataFrame, and the filtered results are collected into a new DataFrame.

apply_conditions is trying to access the spark context inside the worker node resulting in the error below.

pyspark.errors.exceptions.base.PySparkRuntimeError: [CONTEXT_ONLY_VALID_ON_DRIVER] It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063. PicklingError: Could not serialize object: IndexError: tuple index out of rang

Expected output - It should filter out the row that has negative o_totalprice

(13710945, 225010, 'O', -302312.54, '1997-09-29', '5-LOW', 'Clerk#000002337', 0, "ironic platelets nag slyly"),

Help me fix this error.

0

There are 0 best solutions below