I am working on a personal project that entails creating an AWS Glue job that will do some basic transformations and move it to a DocumentDB database.
The main problem I am having right now is that I am unable to move the data to the DocumentDB database.
The code I have for my Glue job is as follows:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext, SparkConf
from awsglue.context import GlueContext
from awsglue.job import Job
import time
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
documentdb_uri = "mongodb://mydocumentdb_uri"
documentdb_write_uri = "mongodb://mydocumentdb_uri"
read_docdb_options = {
"uri": documentdb_uri,
"database": "my_db_name",
"collection": "my_collection",
"username": "my_username",
"password": "my_password",
"ssl": "true",
"ssl.domain_match": "false",
}
write_documentdb_options = {
"uri": documentdb_write_uri,
"database": "my_db_name",
"collection": "my_collection",
"username": "my_username",
"password": "my_password"
}
## @type: DataSource
## @args: [database = "my-project-data-database", table_name = "my-project-data-table", redshift_tmp_dir = args["TempDir"], transformation_ctx = "<transformation_ctx>"]
## @return: <output>
## @inputs: []
S3SourceData = glueContext.create_dynamic_frame.from_catalog(database = "my-project-data-database", table_name = "my-project-data-table")
## @type: DataSink
## @args: [database = "my-project-data-database", table_name = "my-project-table-name", redshift_tmp_dir = "<redshift_tmp_dir>", transformation_ctx = "<transformation_ctx>"]
## @return: <output>
## @inputs: [frame = <frame>]
data_to_write_to_documentdb = glueContext.write_dynamic_frame.from_catalog(frame = S3SourceData, database = "my-project-data-database", table_name = "my-project-data-table")
## @type: DropFields
## @args: [paths = [<paths>], transformation_ctx = "<transformation_ctx>"]
## @return: <output>
## @inputs: [frame = <frame>]
Transformed_Example = DropFields.apply(frame = S3SourceData, paths = ['Multiple fields to drop here']
)
# Write the frame to DocumentDB
glueContext.write_dynamic_frame.from_options(Transformed_Example, connection_type="documentdb",
connection_options=write_documentdb_options)
job.commit()
This project is just for learning purposes only, so I am not looking to do some major complicated ETL. I just want to take data from an S3 bucket which I have already crawled with a glue crawler and drop some fields and move it to DocumentDB.
I feel like I am missing something fundamental in order to move the transformed data to DocumentDB. But, I am unable to figure it out. I've gone through the docs for Glue and DocumentDB but I am unable to find an example of what I am trying to do, or maybe I am just not understanding the examples which are given.
I've been going at this for almost 10 hours now so if anyone is able to help me out here I would really appreciate it.