Convert Cloudtrail Logs to Parquet with AWS Glue

360 Views Asked by At

At a large scale, the CloudTrail (CT) log format proves inefficient, producing an overwhelming 3BN+ records daily. This sheer volume, combined with the JSON format, hampers Athena's performance. To address this, I'm working on an AWS Glue job to consolidate and convert this data into more manageable Parquet files. However, I've encountered an unexpected issue.

This is a snippet of my code:

operation_kwargs = {
  "database": data_lake_database_name,
  "table_name": original_table_name,
  "push_down_predicate": f"region=='{region}' and year=='{year}' and month=='{month}' and day=='{day}'",
  "catalogPartitionPredicate": f"region='{region}' and year='{year}' and month='{month}' and day='{day}'",
  "transformation_ctx": "raw_data",
}

raw_data = glue_context.create_dynamic_frame_from_catalog(**operation_kwargs)

raw_data = raw_data.repartition(100)

print(raw_data.toDF().show(5))

When I execute this, the print output displays merely the partitions and a column named Records. I suspect this might be due to the structure of the CloudTrail (CT) files. For context, I created the Glue table for CT by following this AWS guide: Querying AWS CloudTrail logs

Here's a snapshot of the DataFrame (DF) I'm working with::

+--------------------+---------+----+-----+---+
|             Records|   region|year|month|day|
+--------------------+---------+----+-----+---+
|[{1.09, {AssumedR...|us-east-1|2023|   10| 03|
|[{1.09, {AWSServi...|us-east-1|2023|   10| 03|
|[{1.09, {AWSServi...|us-east-1|2023|   10| 03|
|[{1.09, {AWSServi...|us-east-1|2023|   10| 03|
|[{1.09, {AWSServi...|us-east-1|2023|   10| 03|
+--------------------+---------+----+-----+---+

only showing top 5 rows

What am I doing wrong here?
I feel that this should not be that hard out of the box, since the table has the correct SerDe and is queryable from Athena.

1

There are 1 best solutions below

0
JohnBegood On

Presumably you have took all the additional steps for the right AWS Glue ETL Also you have set up the AWS Glue Crawler.

High level script could look like this : In the ETL job, you will need to write a transformation script in Spark or PySpark to read the CloudTrail logs and convert them to Parquet. Here's an example:

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

# Get the arguments from the command line
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

# Create DynamicFrame from CloudTrail data
dynamic_frame = glueContext.create_dynamic_frame.from_catalog(database="your_database_name", table_name="your_table_name")

# Transform the data (e.g., select columns, apply filters, etc.)
transformed_dynamic_frame = # Your transformation code here

# Write the transformed data to S3 in Parquet format
glueContext.write_dynamic_frame.from_catalog(transformed_dynamic_frame, database="your_database_name", table_name="output_parquet_table")

job.commit()