Python/Boto - Writing to AWS CloudWatch Logs without sequence token

17.3k Views Asked by At

I am trying to send logs to AWS CloudWatch Logs using Python and Boto framework. I am doing this:

res=logs.put_log_events("FOO", "BAR",
     [{'timestamp':int(round(time.time() * 1000)),
       'message':time.strftime("%m/%d/%Y %H:%M:%S")+' Scheduled  monitoring check' }], 
     sequence_token=None)

I get an error each time I run:

boto.logs.exceptions.InvalidSequenceTokenException: InvalidSequenceTokenException: 400 Bad Request
{u'message': u'The given sequenceToken is invalid. The next expected sequenceToken is: 49540113336360065754596906019042392283494234157161146226', u'expectedSequenceToken': u'49540113336360065754596906019042392283494234157161146226', u'__type': u'InvalidSequenceTokenException'}

It is somewhat impractical for me to store that token. It makes no sense, why can't I just append to the log stream ?

How can I get around this ?

8

There are 8 best solutions below

1
On BEST ANSWER

You can get around it by first looking up the uploadSequenceToken via describe_log_streams():

Essentially, the process is that you use the logStreamNamePrefix to specifically identify the logstream that you want to append to. Then parse the uploadSequenceToken out of the response.

Response Syntax

 {
     'logStreams': [
         {
             'logStreamName': 'string',
             'creationTime': 123,
             'firstEventTimestamp': 123,
             'lastEventTimestamp': 123,
             'lastIngestionTime': 123,
             'uploadSequenceToken': 'string',
             'arn': 'string',
             'storedBytes': 123
         },
     ],
     'nextToken': 'string'
 }

Returns all the log streams that are associated with the specified log group. The list returned in the response is ASCII-sorted by log stream name.

By default, this operation returns up to 50 log streams. If there are more log streams to list, the response would contain a nextToken value in the response body. You can also limit the number of log streams returned in the response by specifying the limit parameter in the request. This operation has a limit of five transactions per second, after which transactions are throttled.

Request Syntax

response = client.describe_log_streams(
    logGroupName='string',
    logStreamNamePrefix='string',
    orderBy='LogStreamName'|'LastEventTime',
    descending=True|False,
    nextToken='string',
    limit=123
)
0
On

You can't, that's how it works:

Every PutLogEvents request must include the sequenceToken obtained from the response of the previous request. An upload in a newly created log stream does not require a sequenceToken.

(source)

0
On

Though most answers here work, BUT.. If you have multiple processes that write to the same stream really fast, they'll get the exception all the time, and if you put it in a loop, the same race condition applies. Everyone should be aware of that!

0
On

As it's mentioned in the docs:

You can also get the sequence token in the expectedSequenceToken field from InvalidSequenceTokenException.

However, the problem is that boto3 does not have the expectedSequenceToken field in the exception as it's discussed in the issue:

Boto3 doesn't support parsing additional parameters from exceptions, it only adds a Code and a Message. Labeling as a feature request, I think this is something we should add, but for now your best workaround would be to parse the error message.

Obviously, it's not ideal to parse the message to get the token because the format of the message may change. But it gives a simple working solution without calling describe_log_streams.

def append_log(group: str, stream: str, msg: str):
    logs = boto3.client('logs')

    def put(token=None, repeat: int = 0):
        events = [{
            'timestamp': int(round(time.time() * 1000)),
            'message': msg
        }]
        try:
            if token:
                logs.put_log_events(logGroupName=group, logStreamName=stream, logEvents=events, sequenceToken=token)
            else:
                logs.put_log_events(logGroupName=group, logStreamName=stream, logEvents=events)
        except (logs.exceptions.InvalidSequenceTokenException, logs.exceptions.DataAlreadyAcceptedException) as e:
            error_msg = e.response['Error']['Message']
            if repeat > 10:
                raise Exception("Too many repeats to write log")
            put(error_msg[error_msg.index(":") + 1:].strip(), repeat + 1)

    try:
        put()
    except logs.exceptions.ResourceNotFoundException:
        try:
            logs.create_log_stream(logGroupName=group, logStreamName=stream)
        except logs.exceptions.ResourceNotFoundException:
            logs.create_log_group(logGroupName=group)
            logs.create_log_stream(logGroupName=group, logStreamName=stream)
        put()

The function will create the group and the stream if it does not exist.

0
On

Here's a logging class I created with help from the other answers here that doesn't require the logs:DescribeLogStreams IAM permission. This is meant to be a standalone module that can be imported (with the boto3 session passed in at class initialization).

import time

class CloudWatch:
    def __init__(self, boto3, log_group):
        self.client = boto3.client("logs")
        self.log_group = log_group
        self.sequence_token = None

    def log(self, message):
        
        print(message) # Delete this if you don't want stdout as well.

        log_stream = time.strftime('%Y-%m-%d')

        event_log = {
            'logGroupName': self.log_group,
            'logStreamName': log_stream,
            'logEvents': [
                {
                    'timestamp': int(round(time.time() * 1000)),
                    'message': message
                }
            ],
        }

        if self.sequence_token is not None:
            event_log.update({"sequenceToken" : self.sequence_token})

        for _ in range(3):
            try:
                response = self.client.put_log_events(**event_log)
                self.sequence_token = response["nextSequenceToken"]
                return
            except self.client.exceptions.ResourceNotFoundException:
                try:
                    self.client.create_log_group(logGroupName=self.log_group)
                except self.client.exceptions.ResourceAlreadyExistsException:
                    pass
                try:
                    self.client.create_log_stream(logGroupName=self.log_group, logStreamName=log_stream)
                except self.client.exceptions.ResourceAlreadyExistsException:
                    pass
            except self.client.exceptions.InvalidSequenceTokenException as e:
                event_log.update({"sequenceToken" : e.response["Error"]["Message"].split("is: ")[-1]})
                continue
            except self.client.exceptions.DataAlreadyAcceptedException:
                return
0
On

Most of the answers on here are now outdated. As I look today, the docs for boto3 at https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/logs/client/put_log_events.html say

Warning The sequence token is now ignored in PutLogEvents actions

So these examples now contain unneeded complexity.

This might be useful as a simple cloudwatch logs client.

class CloudwatchLogs:
    def __init__(self, log_group):
        self.logs = boto3.client('logs')
        self.log_stream = f'{datetime.datetime.utcnow().strftime("%Y-%m-%dT%H-%M-%S")}-logstream'

        self.log_group = log_group
        self.created = False

    def _maybe_create(self):
        if not self.created:
            try:
                self.logs.create_log_stream(logGroupName=self.log_group, logStreamName=self.log_stream)
                self.created = True
            except self.logs.exceptions.ResourceAlreadyExistsException:
                pass

    def log(self, data: List[dict]):
        def as_cloudwatch(item) -> dict:
            return {'timestamp': int(time.time_ns() / 1_000_000), 'message': json.dumps(item)}

        self._maybe_create()

        self.logs.put_log_events(
            logGroupName=self.log_group,
            logStreamName=self.log_stream,
            logEvents=[as_cloudwatch(i) for i in data]
        )

Then you can say

logs = CloudwatchLogs(log_group="/log/group")
logs.log([ { "eventName": "NiceEvent", "text": "Something" } ] )
1
On

AWS Cloud Watch Putlogevent code

import boto3
import time


client = boto3.client('logs')

LOG_GROUP='cloudwatch_customlog'
LOG_STREAM='{}-{}'.format(time.strftime('%Y-%m-%d'),'logstream')

try:
   client.create_log_group(logGroupName=LOG_GROUP)
except client.exceptions.ResourceAlreadyExistsException:
   pass

try:
   client.create_log_stream(logGroupName=LOG_GROUP, logStreamName=LOG_STREAM)
except client.exceptions.ResourceAlreadyExistsException:
   pass

response = client.describe_log_streams(
   logGroupName=LOG_GROUP,
   logStreamNamePrefix=LOG_STREAM
)

event_log = {
   'logGroupName': LOG_GROUP,
   'logStreamName': LOG_STREAM,
   'logEvents': [
       {
           'timestamp': int(round(time.time() * 1000)),
           'message': time.strftime('%Y-%m-%d %H:%M:%S')+'\t Your custom log messages'
       }
   ],
}

if 'uploadSequenceToken' in response['logStreams'][0]:
   event_log.update({'sequenceToken': response['logStreams'][0] ['uploadSequenceToken']})

response = client.put_log_events(**event_log)
print(response)
3
On

To answer the why part with an educated guess: It's the nature of a scalable asynchronous service.

If Amazon would not ask you to maintain a sequence number, then they could never scale out their CloudWatch service across many instances while still being able to guarantee that your logs appear in the exact same order as they happened (and imagine to how annoying out-of-order log entries would be when debugging a problem). Any tiny deviation in clocks, network latency, or other delay along the path to log acceptors would introduce ordering problems.

But since they do ask you for a sequence number, suddenly they can scale out their service easily and simply merge-sort incoming log entries back while still retaining the correct log order, your log order.