How to iterate through a Glue DynamicFrame

2.1k Views Asked by At

Hi iam working AWS glue spark. I am grabbing the data from a dynamodb table and creating a dynamic frame from it. I want to be able to send all the data from that table, record by record in sqs. I have seen another suggest to convert dynamic frame to spark dataframe. But this is going to be a table with millions of records. Converting to a dataframe could take a while. I want to be able to just send all the records in the dynamic frame over to the sqs queue.

Here is my code:

sqs = boto3.resource('sqs')

sqs_queue_url = f"https://sqs.us-east-1.amazonaws.com/{account_id}/my-stream-queue"
queue = sqs.Queue(sqs_queue_url)

sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
job = Job(glueContext)

## @params: [JOB_NAME]
job.init(args['JOB_NAME'], args)

logger = glueContext.get_logger()

df = glueContext.create_dynamic_frame.from_options("dynamodb",
                                                  connection_options={
                                                                      "dynamodb.input.tableName": "my_table",
                                                                      "dynamodb.throughput.read.percent": "1.5",
                                                                      "dynamodb.splits": "500"
                                                                      },
                                                   numSlots=2368)

job.commit()

# iterate over dynamic frame and send each record over the sqs queue

for record in df:

     queue.send_message(MessageBody=record)
1

There are 1 best solutions below

1
On

I am doing something very similar. Here is what I discovered:

datasource0 = glueContext.create_dynamic_frame.from_catalog(
    database="athena", 
    table_name=str(args['value']), 
    transformation_ctx="datasource0")
job.commit()
df = datasource0.toDF()
pandasDF = df.toPandas()

for index, row in pandasDF.iterrows():
    message_body = generate_message(
        row['bucket'], row['key'], row['version_id'])
    send_message(sqs_queue, json.loads(json.dumps(message_body)))