Unable to insert items into DDB table if the table has 30-40k items

44 Views Asked by At

The purpose of the below code is to retrieves data from firebase and post duplicate check & split it would append the items to buffer. Once the buffer limit has reached 4 its would insert the items in the DDB table using batchWriter.

To confirm the above code runs fine if I use a new/empty Dynamodb table but after the inserted items goes upto 40K then not all the expected items gets inserted. I re-used an old table also but same strange behavior.

I wish to understand is this could be a code logic problem or something modification required in the AWS table side.?

Results post running the code: -=-=-=-=-=-=-=-=-=-= The below code successfully entered 32 items into the new table. But when I just changed the table name to old table (which already contains 60,485 items) not a single data got inserted out of the 32. Code dint give any error nor did cloudwatch showed any alerts. However, If i keep executing the code for a longer period of time where 150 rows should have been inserted, only one gets inserted.

The WCU and RCU is set ot 25 and also had changed it to On-Demand, but still same problem.

Code: -=-=-

`# Define the DynamoDB tables
dynamodb = boto3.resource('dynamodb', region_name='my-region')
table_bms = dynamodb.Table('MS_TR_MS')

# Buffer and counter initialization
buffers = {
    'ms_str_msg': [],

}
counter = 0
buffer_size = 4
received_values = {}


def save_to_db(tob_id, top_name, top_data):
    with open(log_file_path, 'a') as log_file:
        try:
            # Check for duplicate entry
            key = f'{tob_id}_{top_name}'
            value = top_data
            if key in received_values and received_values[key] == value:
                # print(f"Duplicate entry for {key}: {value}. Discarding.")
                return
            else:
                received_values[key] = value
                print(f"Adding entry: {key}: {value}")
                
            # Split & Append data to the buffer            
            if isinstance(top_data, str):
                topic_data = top_data.split(":")[0]
                string_array = top_data.split(",")
            else:                
                return
                
        except Exception as e:
            print(f'Error: {top_data} \n{e}')
            return
# checking number of items inserted fro buffer to DDB table
        if counter>=30:
            exit()

    #Updating items to single table
        if top_name == "ms_str_msg":
            if len(string_array) == 5:
                # table = table_ms.name
                print('Updating MS table')

            buffers[top_name].append({
                # 'UID': {'S': unique_constant_value},
                "timestamp": str(current_date_time),
                "date": str(current_date),
                "time": str(current_timestamp),
                "tob_id": str(tob_id),
                "voltage": str(string_array[0]),
                "current_amps": str(string_array[1]),
                "total_capacity": str(string_array[2]),
                "SOC": str(string_array[3]),
                "Battery Level": str(string_array[4])})

        # Check buffer size and update tables
        if len(buffers[top_name]) >= buffer_size:
            update_tables(top_name)

# using batch_writer to write items from buffer to table

def update_tables(top_name):
    global counter
    try:
        with table_bms.batch_writer() as writer:
            for item in buffers[topic_name]:
                writer.put_item(Item=item)
                counter = counter+1
        buffers[top_name] = []
        
        # Adding a counter to check number of rows inserting
        print(f"Counter is {counter}")
        if counter>=30:
            exit()
            
    except ClientError as err:
        print("Couldn't load data into table %s. Here's why: %s: %s")

if __name__ == "_

main_": main()`

Code: 2

def update_tables(): try: eastern = dateutil.tz.gettz('US/Eastern') with table_bms.batch_writer() as writer: for i in range(30000): msg = f"counter {i}"
writer.put_item(Item={"Time": datetime.datetime.now(tz=eastern).strftime('%m/%d/%Y %H:%M:%S.%f %Z'), "Message": msg})

0

There are 0 best solutions below