How to update data when loading it between two S3 buckets using AWS Glue?

1.3k Views Asked by At

This is my first data analytics project and I'm working on a data pipeline on AWS, the pipeline steps should be as follow:

  1. Export data from RDS to S3 in parquet format (Done).
  2. Query data in S3 using Athena (Done).
  3. Update the invalid data and transfer it to a new S3 Bucket.
  4. Query the cleaned data from the second S3 bucket.

I'm stuck in step 3, in this step, a validation team must filter the valid data and update the invalid data by querying the data from the first S3 bucket, updating the invalid data and copying it to the second S3 bucket.

One consideration is that the old invalid data must remain as is in the first bucket, the updating process must occur while transferring process.

Is there a way to use the UPDATE statement in Athena?

Could we pass the required UPDATE statement in an AWS Glue job?

I tried the following script from this question to update the data:

from pyspark.context import SparkContext
from awsglue.context import GlueContext
spark_session = glueContext.spark_session
sqlContext = SQLContext(spark_session.sparkContext, spark_session)
glueContext = GlueContext(SparkContext.getOrCreate())
dyF = glueContext.create_dynamic_frame.from_catalog(database='{{database}}',table_name='{{table_name}}'
df = dyF.toDF()
df.registerTempTable('{{name}}')
df = sqlContext.sql("{{sql_query_on_above_created_table}}")
df.format('parquet').save('{{s3_location}}')

But I got This Error:

SyntaxError: invalid syntax (Untitled job.py, line 7)

I read about using AWS Data Pipeline with EMR, but I think it's complicated and I can't imagine how it can serve on-demand queries?

What is the best solution to update the data while transferring it between two S3 buckets and keeping the old version as is and putting the new data in a new S3 bucket??

1

There are 1 best solutions below

1
On

A couple of options come to mind for Step 3

First, Athena doesn't have Update queries but you could likely use a CTAS query

CREATE TABLE new_table
WITH (
      format = 'Parquet',
      external_location = 's3://my-other-bucket/'),
      write_compression = 'SNAPPY')
AS SELECT *, plusSomeTransformations
FROM existing_table;

or with Glue like you eluded to in your question

from pyspark.context import SparkContext
from awsglue.context import GlueContext
spark_session = glueContext.spark_session
sqlContext = SQLContext(spark_session.sparkContext, spark_session)
glueContext = GlueContext(SparkContext.getOrCreate())
dyF = glueContext.create_dynamic_frame.from_catalog(database='myDB',table_name='existing_table'
df = dyF.toDF()
df.registerTempTable('myTable')
df = sqlContext.sql("SELECT *, plusSomeTransformations FROM myTable")
df.format('parquet').save('s3://my-other-bucket/')