I am looking to perform a daily 'aggregation' of a large Parquet data lake stored on AWS S3. I have setup the data lake so that it can be queried via Athena. My structure is as follows:
s3://bucketname/deviceid/message/yyyy/mm/dd/xyz.parquet
The data is basically from vehicle data loggers and there can be easily 500+ devices and TB of data in the data lake. For a single day, there may be up to 1 GB per device in the data lake.
For some use cases I need to analyze the data at a "vehicle trip" level and in order to query this efficiently I am looking to create a workflow where a Python script performs a daily aggregation of the last day's worth of data.
I have managed to set this up and test it locally (storing my data lake next to my script), but I would like to now be able to run this directly on my S3 data lake - and ideally deploy this script in a way where it can be automated. The aggregation tables should be output as daily Parquet files in a separate folder in my S3 bucket containing the data lake.
In this regard I am in doubt on two things:
I am considering to deploy the script as a Glue job because I already have Glue setup for my Athena purposes (mapping my data lake). It thus seems like a good way to re-use existing infrastructure. However, I am unsure if this is a meaningful way to deploy a script like this or if another way is superior? I aim to minimize costs, while also ensuring that this deployment works for both 1 device and 1000 devices.
If I e.g. deploy as a Glue job, should I use pyarrow (as in my local script below) to load the Parquet files during the aggregation - or would it be faster/cheaper to use e.g. Athena SQL queries? My concern with pyarrow is mainly whether there is a lot of wasted overhead/inefficiency from "downloading" files into the Glue job environment, where this seems abstracted away via Athena. On the other hand, I fear introducing the Athena query loop will add a lot of code complexity that may be best to avoid - and maybe it's not going to be superior in terms of speed/cost anyway?
Below is my local script:
aggregations.json file:
{
"device_clusters": [
{
"devices": [
"2A896980",
"2F6913DB",
"18D508A1"
],
"cluster": "cluster1"
},
{
"devices": [
"12345678"
],
"cluster": "cluster2"
}
],
"cluster_details": [
{
"cluster1": {
"trip_identifier": {
"message": "CAN2_gnss_speed",
"signal": [
"Speed"
]
},
"aggregations": [
{
"message": "CAN2_gnss_speed",
"signal": [
"Speed"
],
"aggregation": "avg"
},
{
"message": "CAN2_gnss_altitude",
"signal": [
"Altitude"
],
"aggregation": "max"
},
{
"message": "CAN2_gnss_imu",
"signal": [
"AccelerationX",
"AccelerationY",
"AccelerationZ"
],
"aggregation": "avg"
}
]
}
}
]
}
Python script for aggregation:
import json
import os
import pyarrow as pa
import pyarrow.parquet as pq
import pandas as pd
from datetime import datetime, timedelta
import sys
# Define output table name
table_name = "tripsummary"
# Load JSON configuration
with open('aggregations.json', 'r') as file:
config = json.load(file)
# Map devices to their clusters
device_to_cluster = {device: cluster_data['cluster']
for cluster_data in config['device_clusters']
for device in cluster_data['devices']}
# Base path of the data lake
data_lake_path = 'datalake'
def get_trip_windows(directory_path, signal):
try:
files = [os.path.join(directory_path, f) for f in os.listdir(directory_path) if f.endswith('.parquet')]
dfs = [pq.read_table(f).to_pandas() for f in files]
df = pd.concat(dfs, ignore_index=True)
df['t'] = pd.to_datetime(df['t'])
df.sort_values(by='t', inplace=True)
df['time_diff'] = df['t'].diff()
trip_starts = df[df['time_diff'] > pd.Timedelta(minutes=10)]['t'].tolist()
if df['t'].iloc[0] not in trip_starts:
trip_starts.insert(0, df['t'].iloc[0])
# Subtract one second from the start of each trip
trip_starts = [ts - pd.Timedelta(seconds=1) for ts in trip_starts]
# Add one second to the end of each trip
trip_ends = [ts - pd.Timedelta(seconds=1) for ts in trip_starts[1:]]
trip_ends.append(df['t'].iloc[-1] + pd.Timedelta(seconds=1))
return list(zip(trip_starts, trip_ends))
except Exception as e:
print(f"Error processing for trip windows in directory {directory_path}: {e}")
return []
def daterange(start_date, end_date):
for n in range(int((end_date - start_date).days) + 1):
yield start_date + timedelta(n)
def process_directory_for_trip(directory_path, device_id, message, signals, aggregation_type, trip_window, cluster_name):
try:
files = [os.path.join(directory_path, f) for f in os.listdir(directory_path) if f.endswith('.parquet')]
dfs = [pq.read_table(f).to_pandas() for f in files]
df = pd.concat(dfs, ignore_index=True)
df = df[(df['t'] >= trip_window[0]) & (df['t'] <= trip_window[1])]
results = []
for signal in signals:
if signal not in df.columns:
continue
result_value = None
if aggregation_type == 'avg':
result_value = df[signal].mean()
elif aggregation_type == 'max':
result_value = df[signal].max()
elif aggregation_type == 'min':
result_value = df[signal].min()
elif aggregation_type == 'sum':
result_value = df[signal].sum()
count = df[signal].count()
duration = (df['t'].max() - df['t'].min()).total_seconds()
trip_id = f"{device_id}_{int(trip_window[0].timestamp())}"
trip_start = int(trip_window[0].timestamp())
trip_end = int(trip_window[1].timestamp())
if result_value is not None:
results.append([device_id, message, signal, aggregation_type, result_value, count, duration, trip_start, trip_end, trip_id, cluster_name])
return results
except Exception as e:
print(f"Error processing directory {directory_path} for trip: {e}")
return []
# ... [previous parts of the script]
def write_results_to_parquet(results, date, table_name):
if not results:
return
df = pd.DataFrame(results, columns=['DeviceID', 'Message', 'Signal', 'Aggregation', 'Value', 'Count', 'Duration', 'TripStart', 'TripEnd', 'TripID', 'Cluster'])
date_path = date.strftime("%Y/%m/%d")
output_path = os.path.join(data_lake_path, 'aggregations', table_name, date_path)
os.makedirs(output_path, exist_ok=True)
file_path = os.path.join(output_path, f"{date.strftime('%Y%m%d')}.parquet")
table = pa.Table.from_pandas(df)
pq.write_table(table, file_path)
def process_data_lake(start_date, end_date):
for single_date in daterange(start_date, end_date):
daily_results = []
for device_id, cluster in device_to_cluster.items():
date_path = single_date.strftime("%Y/%m/%d")
cluster_detail = next((detail for detail in config['cluster_details'] if cluster in detail), None)
if cluster_detail:
trip_identifier = cluster_detail[cluster].get('trip_identifier', {})
if 'message' in trip_identifier and 'signal' in trip_identifier:
trip_directory_path = os.path.join(data_lake_path, device_id, trip_identifier.get('message', ''), date_path)
if os.path.exists(trip_directory_path):
trip_windows = get_trip_windows(trip_directory_path, trip_identifier['signal'][0])
cluster_aggregations = cluster_detail[cluster].get('aggregations', [])
for trip_window in trip_windows:
for agg in cluster_aggregations:
directory_path = os.path.join(data_lake_path, device_id, agg['message'], date_path)
if os.path.exists(directory_path):
directory_results = process_directory_for_trip(
directory_path, device_id, agg['message'], agg['signal'], agg['aggregation'], trip_window, cluster)
if directory_results:
daily_results.extend(directory_results)
if daily_results:
write_results_to_parquet(daily_results, single_date, table_name)
# Extract devices and aggregations
devices = config.get('devices', [])
aggregations = config.get('aggregations', [])
# Define the time period for data processing
end_date = datetime.today()
start_date = end_date - timedelta(days=700)
# Test on specific date
# end_date = datetime(2023, 5, 26)
# start_date = datetime(2023, 5, 26)
# Process the data lake
process_data_lake(start_date, end_date)