AWS S3 Parquet data lake: How to best deploy aggregation Python script

25 Views Asked by At

I am looking to perform a daily 'aggregation' of a large Parquet data lake stored on AWS S3. I have setup the data lake so that it can be queried via Athena. My structure is as follows:

s3://bucketname/deviceid/message/yyyy/mm/dd/xyz.parquet

The data is basically from vehicle data loggers and there can be easily 500+ devices and TB of data in the data lake. For a single day, there may be up to 1 GB per device in the data lake.

For some use cases I need to analyze the data at a "vehicle trip" level and in order to query this efficiently I am looking to create a workflow where a Python script performs a daily aggregation of the last day's worth of data.

I have managed to set this up and test it locally (storing my data lake next to my script), but I would like to now be able to run this directly on my S3 data lake - and ideally deploy this script in a way where it can be automated. The aggregation tables should be output as daily Parquet files in a separate folder in my S3 bucket containing the data lake.

In this regard I am in doubt on two things:

  1. I am considering to deploy the script as a Glue job because I already have Glue setup for my Athena purposes (mapping my data lake). It thus seems like a good way to re-use existing infrastructure. However, I am unsure if this is a meaningful way to deploy a script like this or if another way is superior? I aim to minimize costs, while also ensuring that this deployment works for both 1 device and 1000 devices.

  2. If I e.g. deploy as a Glue job, should I use pyarrow (as in my local script below) to load the Parquet files during the aggregation - or would it be faster/cheaper to use e.g. Athena SQL queries? My concern with pyarrow is mainly whether there is a lot of wasted overhead/inefficiency from "downloading" files into the Glue job environment, where this seems abstracted away via Athena. On the other hand, I fear introducing the Athena query loop will add a lot of code complexity that may be best to avoid - and maybe it's not going to be superior in terms of speed/cost anyway?

Below is my local script:

aggregations.json file:

{
  "device_clusters": [
    {
      "devices": [
        "2A896980",
        "2F6913DB",
        "18D508A1"
      ],
      "cluster": "cluster1"
    },
    {
      "devices": [
        "12345678"
      ],
      "cluster": "cluster2"
    }
  ],
  "cluster_details": [
    {
      "cluster1": {
        "trip_identifier": {
          "message": "CAN2_gnss_speed",
          "signal": [
            "Speed"
          ]
        },
        "aggregations": [
          {
            "message": "CAN2_gnss_speed",
            "signal": [
              "Speed"
            ],
            "aggregation": "avg"
          },
          {
            "message": "CAN2_gnss_altitude",
            "signal": [
              "Altitude"
            ],
            "aggregation": "max"
          },
          {
            "message": "CAN2_gnss_imu",
            "signal": [
              "AccelerationX",
              "AccelerationY",
              "AccelerationZ"
            ],
            "aggregation": "avg"
          }
        ]
      }
    }
  ]
}

Python script for aggregation:

import json
import os
import pyarrow as pa
import pyarrow.parquet as pq
import pandas as pd
from datetime import datetime, timedelta
import sys 

# Define output table name
table_name = "tripsummary"

# Load JSON configuration
with open('aggregations.json', 'r') as file:
    config = json.load(file)

# Map devices to their clusters
device_to_cluster = {device: cluster_data['cluster']
                     for cluster_data in config['device_clusters']
                     for device in cluster_data['devices']}


# Base path of the data lake
data_lake_path = 'datalake'

def get_trip_windows(directory_path, signal):
    try:
        files = [os.path.join(directory_path, f) for f in os.listdir(directory_path) if f.endswith('.parquet')]
        dfs = [pq.read_table(f).to_pandas() for f in files]
        df = pd.concat(dfs, ignore_index=True)

        df['t'] = pd.to_datetime(df['t'])
        df.sort_values(by='t', inplace=True)
        df['time_diff'] = df['t'].diff()

        trip_starts = df[df['time_diff'] > pd.Timedelta(minutes=10)]['t'].tolist()
        if df['t'].iloc[0] not in trip_starts:
            trip_starts.insert(0, df['t'].iloc[0])

        # Subtract one second from the start of each trip
        trip_starts = [ts - pd.Timedelta(seconds=1) for ts in trip_starts]

        # Add one second to the end of each trip
        trip_ends = [ts - pd.Timedelta(seconds=1) for ts in trip_starts[1:]]
        trip_ends.append(df['t'].iloc[-1] + pd.Timedelta(seconds=1))

        return list(zip(trip_starts, trip_ends))
    except Exception as e:
        print(f"Error processing for trip windows in directory {directory_path}: {e}")
        return []



def daterange(start_date, end_date):
    for n in range(int((end_date - start_date).days) + 1):
        yield start_date + timedelta(n)

def process_directory_for_trip(directory_path, device_id, message, signals, aggregation_type, trip_window, cluster_name):
    try:
        files = [os.path.join(directory_path, f) for f in os.listdir(directory_path) if f.endswith('.parquet')]
        dfs = [pq.read_table(f).to_pandas() for f in files]
        df = pd.concat(dfs, ignore_index=True)

        df = df[(df['t'] >= trip_window[0]) & (df['t'] <= trip_window[1])]
        results = []

        for signal in signals:
            if signal not in df.columns:
                continue
            result_value = None
            if aggregation_type == 'avg':
                result_value = df[signal].mean()
            elif aggregation_type == 'max':
                result_value = df[signal].max()
            elif aggregation_type == 'min':
                result_value = df[signal].min()
            elif aggregation_type == 'sum':
                result_value = df[signal].sum()
            count = df[signal].count()
            duration = (df['t'].max() - df['t'].min()).total_seconds()

            trip_id = f"{device_id}_{int(trip_window[0].timestamp())}"
            trip_start = int(trip_window[0].timestamp())
            trip_end = int(trip_window[1].timestamp())

            if result_value is not None:
                results.append([device_id, message, signal, aggregation_type, result_value, count, duration, trip_start, trip_end, trip_id, cluster_name])
    
        return results
    except Exception as e:
        print(f"Error processing directory {directory_path} for trip: {e}")
        return []

# ... [previous parts of the script]

def write_results_to_parquet(results, date, table_name):
    
    if not results:
        return
    df = pd.DataFrame(results, columns=['DeviceID', 'Message', 'Signal', 'Aggregation', 'Value', 'Count', 'Duration', 'TripStart', 'TripEnd', 'TripID', 'Cluster'])
    date_path = date.strftime("%Y/%m/%d")
    output_path = os.path.join(data_lake_path, 'aggregations', table_name, date_path)
    os.makedirs(output_path, exist_ok=True)
    file_path = os.path.join(output_path, f"{date.strftime('%Y%m%d')}.parquet")
    table = pa.Table.from_pandas(df)
    pq.write_table(table, file_path)

def process_data_lake(start_date, end_date):
    for single_date in daterange(start_date, end_date):
        daily_results = []
        for device_id, cluster in device_to_cluster.items():
            date_path = single_date.strftime("%Y/%m/%d")
            cluster_detail = next((detail for detail in config['cluster_details'] if cluster in detail), None)
            
            if cluster_detail:
                trip_identifier = cluster_detail[cluster].get('trip_identifier', {})
                if 'message' in trip_identifier and 'signal' in trip_identifier:
                    trip_directory_path = os.path.join(data_lake_path, device_id, trip_identifier.get('message', ''), date_path)
                    if os.path.exists(trip_directory_path):
                        trip_windows = get_trip_windows(trip_directory_path, trip_identifier['signal'][0])
                        cluster_aggregations = cluster_detail[cluster].get('aggregations', [])
                        for trip_window in trip_windows:
                            for agg in cluster_aggregations:
                                directory_path = os.path.join(data_lake_path, device_id, agg['message'], date_path)
                                if os.path.exists(directory_path):
                                    directory_results = process_directory_for_trip(
                                        directory_path, device_id, agg['message'], agg['signal'], agg['aggregation'], trip_window, cluster)
                                    if directory_results:
                                        daily_results.extend(directory_results)

        if daily_results:
            write_results_to_parquet(daily_results, single_date, table_name)


# Extract devices and aggregations
devices = config.get('devices', [])
aggregations = config.get('aggregations', [])

# Define the time period for data processing
end_date = datetime.today() 
start_date = end_date - timedelta(days=700) 

# Test on specific date
# end_date = datetime(2023, 5, 26)
# start_date = datetime(2023, 5, 26)

# Process the data lake
process_data_lake(start_date, end_date)

0

There are 0 best solutions below