The purpose of the below code is to retrieves data from firebase and post duplicate check & split it would append the items to buffer. Once the buffer limit has reached 4 its would insert the items in the DDB table using batchWriter.
To confirm the above code runs fine if I use a new/empty Dynamodb table but after the inserted items goes upto 40K then not all the expected items gets inserted. I re-used an old table also but same strange behavior.
I wish to understand is this could be a code logic problem or something modification required in the AWS table side.?
Results post running the code: -=-=-=-=-=-=-=-=-=-= The below code successfully entered 32 items into the new table. But when I just changed the table name to old table (which already contains 60,485 items) not a single data got inserted out of the 32. Code dint give any error nor did cloudwatch showed any alerts. However, If i keep executing the code for a longer period of time where 150 rows should have been inserted, only one gets inserted.
The WCU and RCU is set ot 25 and also had changed it to On-Demand, but still same problem.
Code: -=-=-
`# Define the DynamoDB tables
dynamodb = boto3.resource('dynamodb', region_name='my-region')
table_bms = dynamodb.Table('MS_TR_MS')
# Buffer and counter initialization
buffers = {
'ms_str_msg': [],
}
counter = 0
buffer_size = 4
received_values = {}
def save_to_db(tob_id, top_name, top_data):
with open(log_file_path, 'a') as log_file:
try:
# Check for duplicate entry
key = f'{tob_id}_{top_name}'
value = top_data
if key in received_values and received_values[key] == value:
# print(f"Duplicate entry for {key}: {value}. Discarding.")
return
else:
received_values[key] = value
print(f"Adding entry: {key}: {value}")
# Split & Append data to the buffer
if isinstance(top_data, str):
topic_data = top_data.split(":")[0]
string_array = top_data.split(",")
else:
return
except Exception as e:
print(f'Error: {top_data} \n{e}')
return
# checking number of items inserted fro buffer to DDB table
if counter>=30:
exit()
#Updating items to single table
if top_name == "ms_str_msg":
if len(string_array) == 5:
# table = table_ms.name
print('Updating MS table')
buffers[top_name].append({
# 'UID': {'S': unique_constant_value},
"timestamp": str(current_date_time),
"date": str(current_date),
"time": str(current_timestamp),
"tob_id": str(tob_id),
"voltage": str(string_array[0]),
"current_amps": str(string_array[1]),
"total_capacity": str(string_array[2]),
"SOC": str(string_array[3]),
"Battery Level": str(string_array[4])})
# Check buffer size and update tables
if len(buffers[top_name]) >= buffer_size:
update_tables(top_name)
# using batch_writer to write items from buffer to table
def update_tables(top_name):
global counter
try:
with table_bms.batch_writer() as writer:
for item in buffers[topic_name]:
writer.put_item(Item=item)
counter = counter+1
buffers[top_name] = []
# Adding a counter to check number of rows inserting
print(f"Counter is {counter}")
if counter>=30:
exit()
except ClientError as err:
print("Couldn't load data into table %s. Here's why: %s: %s")
if __name__ == "_
main_": main()`
Code: 2
def update_tables():
try:
eastern = dateutil.tz.gettz('US/Eastern')
with table_bms.batch_writer() as writer:
for i in range(30000):
msg = f"counter {i}"
writer.put_item(Item={"Time": datetime.datetime.now(tz=eastern).strftime('%m/%d/%Y %H:%M:%S.%f %Z'), "Message": msg})