This is my scenario.
I have a source database which is an aurora database with Postgresql engine. I have a table named payments. This table consists of millions of records, so at the end of every day I need to read the data and check for any payments that are past due date, if any payments are past due date, I need to mark the payment as "Overdue".
How can I achieve this in an optimized way using the AWS glue spark job? What are the AWS components that can be leveraged to achieve this requirement?
Thanks in advance!
Code:
from pyspark.sql import SparkSession
# SparkSession
spark = SparkSession.builder.config("spark.jars", "absolutepath/postgresql-42.7.3.jar") \
.appName("PostgresApp").getOrCreate()
# PostgreSQL connection details
pg_host = "localhost"
pg_port = 5432
pg_database = "test"
pg_user = "test"
pg_password = "test"
# JDBC connection string
jdbc_url = f"jdbc:postgresql://{pg_host}:{pg_port}/{pg_database}?currentSchema=product"
df.show() # Display the loaded DataFrame
update_query = """
UPDATE product
SET product = 'product2'
WHERE productid= '1'
"""
print("SQL Query:", update_query)
spark.sql(update_query)
# Close connections to avoid resource leaks
spark.stop()
You would need to use AWS Glue to extract the data from your Aurora PostgreSQL
paymentstable.The awsglue python package should help: you have many examples in
aws-samples/aws-glue-samples, and you can also include a DataDirect JDBC driver into DataFrame.From there, filter the records with Spark to identify overdue payments and mark them accordingly. Save the processed data in an Amazon S3 bucket temporarily, and use AWS Lambda along with the RDS Data API (web-services interface to your Aurora DB cluster) to update the original
paymentstable based on the data in S3.Use AWS Lambda, triggered either manually or on a schedule (via Amazon EventBridge, formerly CloudWatch Events), to read the processed data from S3 and update the Aurora database using the RDS Data API:
Ananth's suggestion to directly update records in the PostgreSQL
paymentstable using an SQLUPDATEstatement is a good alternative, especially if you want transactional integrity and to minimize data movement. That would simplify the process by eliminating the need to temporarily store processed data in S3 and then using AWS Lambda for updates.The AWS Glue Spark job would execute the SQL update statement directly against the Aurora PostgreSQL database from within the Spark script. Spark itself does not natively execute SQL
UPDATEstatements directly on a JDBC data source.Instead, you can use the JDBC connection to execute raw SQL commands, but this requires managing the connection yourself within the Spark job, which is not typical Spark usage and may require additional error handling and performance considerations.
An alternative and more Spark-native approach would be to leverage AWS Glue's capability to run a job that triggers an AWS Lambda function, which in turn performs the SQL update operation.
That keeps the heavy lifting within AWS services optimized for these tasks. It is fairly similar to the first part of this answer.
For the direct approach, make sure your AWS Glue job has access to a PostgreSQL JDBC driver. That might involve uploading the driver to S3 and referencing it in your job's dependencies.
Then use, for instance, PySpark's JDBC capabilities to establish a connection to the database and execute an SQL update statement, as in this example.
Make sure your AWS Glue job has the necessary permissions to access the Aurora PostgreSQL database and execute updates. That typically involves configuring the database's security group to allow connections from AWS Glue and providing the appropriate IAM roles. That would execute the update without explicit transaction management. Depending on your use case, you may need to handle transactions more granularly, especially if executing multiple updates or if rollback functionality is required.
You are trying to use Spark SQL's
spark.sql()method for executing an SQLUPDATEstatement, which is not supported directly by Spark SQL. Spark SQL is designed primarily for executing SQL queries that return a DataFrame representing the result set of a SELECT statement or operations likeCREATE TABLE,DROP TABLE, etc., within the Spark SQL context. It does not support direct DML operations (INSERT,UPDATE,DELETE) on external databases like PostgreSQL through thespark.sql()method.To execute an
UPDATEstatement on a PostgreSQL database from a Spark job, you would need to use a JDBC connection directly, bypassing Spark SQL for theUPDATEoperation. That would involve using the JDBC API to execute your SQL commands, not through Spark SQL but by establishing a direct connection to your PostgreSQL database (using, for instance, psycopg2).While the
UPDATEoperation is executed outside the Spark SQL context, you can still use Spark for distributed data processing tasks, including reading from and writing to PostgreSQL with.read.format("jdbc")and.write.format("jdbc").