The title almost says it already. I have a pyspark.sql.dataframe.Dataframe with a "ID", "TIMESTAMP", "CONSUMPTION" and "TEMPERATURE" column. I need the "TIMESTAMP" column to be resampled to daily intervals (from 15min intervals) and the "CONSUMPTION" and "TEMPERATURE" column aggregated by summation. However, this needs to be performed for each unique id in the "ID" column. How do I do this?

Efficiency/speed is of importance to me. I have a huge dataframe to start with, which is why I would like to avoid .toPandas() and for loops.

Any help would be greatly appreciated!

The following code will build a spark_df to play around with. The input_spark_df represents the input spark dataframe, the disred output is like desired_outcome_spark_df.

import pandas as pd
import numpy as np
from pyspark.sql import SparkSession

df_list = []
for unique_id in ['012', '345', '678']:
    date_range = pd.date_range(pd.Timestamp('2022-12-28 00:00'), pd.Timestamp('2022-12-30 23:00'),freq = 'H')
    df = pd.DataFrame()
    df['TIMESTAMP'] = date_range
    df['ID'] = unique_id
    df['TEMPERATURE'] = np.random.randint(1, 10, df.shape[0])
    df['CONSUMPTION'] = np.random.randint(1, 10, df.shape[0])
    df = df[['ID', 'TIMESTAMP', 'TEMPERATURE', 'CONSUMPTION']]
    df_list.append(df)
pandas_df = pd.concat(df_list)

spark = SparkSession.builder.getOrCreate()
input_spark_df = spark.createDataFrame(pandas_df)
desired_outcome_spark_df = spark.createDataFrame(pandas_df.set_index('TIMESTAMP').groupby('ID').resample('1d').sum().reset_index())

To condense the question thus: how do I go from input_spark_df to desired_outcome_spark_df as efficient as possible?

1

There are 1 best solutions below

0
On

I found the answer to my own question. I first change the timestamp to "date only" using pyspark.sql.functions.to_date. Then I groupby both "ID" and "TIMESTAMP" and perfrom the aggregation.

from pyspark.sql.functions import to_date, sum, avg

# Group the DataFrame by the "ID" column
spark_df = input_spark_df.withColumn('TIMESTAMP', to_date(col('TIMESTAMP')))
desired_outcome = (input_spark_df
                   .withColumn('TIMESTAMP', to_date(col('TIMESTAMP')))
                   .groupBy("ID", 'TIMESTAMP')
                   .agg(
                       sum(col("CONSUMPTION")).alias("CUMULATIVE_DAILY_POWER_CONSUMPTION"),
                       avg(col('TEMPERATURE')).alias("AVERAGE_DAILY_TEMPERATURE")
                      ))
grouped_df.display()