Loading multiple records to Kinesis using PutRecords - how to re-send only failed records in case of failure?

5.1k Views Asked by At

I’m using Lambda to load data records into Kinesis and often want to add up to 500K records, I am batching these into chunks of 500 and using Boto's put_records method to send them to Kinesis. I sometimes see failures due to exceeding the allowed throughput.

What is the best approach for retrying when this happens? Ideally I don’t want duplicate messages in the data stream, so I don’t want to simply resend all 500 records, but I’m struggling to see how to retry only the failed messages. The response from the put_records method doesn’t seem to be very useful.

Can I rely on the order of the response Records list being in the same order as the list I pass to putRecords?

I know I can increase the number of shards, but I’d like to significantly increase the number of parallel Lambda functions loading data to this Kinesis stream. We plan to partition data based on the source system and I can’t guarantee that multiple functions won’t write data to the same shard and exceed the allowed throughput. As a result, I don't believe that increasing shards will remove the need to a retry strategy.

Alternatively, does anybody know if KPL will automatically handle this issue for me?

2

There are 2 best solutions below

3
On BEST ANSWER

Can I rely on the order of the response Records list being in the same order as the list I pass to putRecords?

Yes. You will have to rely on the order of response. Order of response records is same as request records.

Please check putrecords response, https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecords.html.

Records: An array of successfully and unsuccessfully processed record results, correlated with the request by natural ordering. A record that is successfully added to a stream includes SequenceNumber and ShardId in the result. A record that fails to be added to a stream includes ErrorCode and ErrorMessage in the result.

To retry the failed records you have to develop your own retry mechanism. I have written retry mechanism in python using recursive function with incremental wait between retries in following way.

import boto3
import time

kinesis_client = boto3.client('kinesis')
KINESIS_RETRY_COUNT = 10
KINESIS_RETRY_WAIT_IN_SEC = 0.1
KINESIS_STREAM_NAME = "your-kinesis-stream"

def send_to_stream(kinesis_records, retry_count):
    put_response = kinesis_client.put_records(
        Records=kinesis_records,
        StreamName=KINESIS_STREAM_NAME
    )
    failed_count = put_response['FailedRecordCount']
    if failed_count > 0:
        if retry_count > 0:
            retry_kinesis_records = []
            for idx, record in enumerate(put_response['Records']):
                if 'ErrorCode' in record:
                    retry_kinesis_records.append(kinesis_records[idx])
            time.sleep(KINESIS_RETRY_WAIT_IN_SEC * (KINESIS_RETRY_COUNT - retry_count + 1))
            send_to_stream(retry_kinesis_records, retry_count - 1)
        else:
            print(f'Not able to put records after retries. Records = {put_response["Records"]}')

In above example, you can change KINESIS_RETRY_COUNT and KINESIS_RETRY_WAIT_IN_SEC for your needs. Also you have to ensure that your lambda timeout is sufficient for retries.

Alternatively, does anybody know if KPL will automatically handle this issue for me?

I am not sure about KPL, but from documentation It looks like it has it's own retry mechanism. https://docs.aws.amazon.com/streams/latest/dev/kinesis-producer-adv-retries-rate-limiting.html

0
On

While you should definitely handle the failures and resend them, one way to minimise the number of extra records to resend is to simply send 500 records, and if you have more to send, delay for 500ms before sending the next lot.

Waiting for 500ms every 500 records will limit you to 1000 records/sec which is the Kinesis PutRecords limit. Staying under this limit will minimise the number of records that have to be sent multiple times.

Only processing 500 records at a time from a larger list also could make the retry logic easier, because any records that fail can simply be appended onto the end of the master list, where they'll be retried when the loop checks to see if there are any more records in the master list left to send to Kinesis.

Just remember to put a check in to abort if the master list isn't getting any smaller on each attempt to send 500 records, which will happen if there is at least one record that is failing every time. Eventually it will be the last one in the list and will keep being sent over and over forever unless this check is in place.

Note that this applies to one shard, if you have more shards then you can adjust these limits accordingly.