Pushing DynamoDB events into Eventbridge using Pipes

101 Views Asked by At

I'm trying to take advantage of the (newish) AWS Pipes functionality to connect DynamoDB events to Eventbridge

I have a Cloudformation template (see below) and a Python script to push an item into DynamoDB (also see below)

The template deploys fine, the script runs fine, I can see the item in DynamoDB via the console, but the target Lambda function doesn't seem to receive an event from Eventbridge

Where am I going wrong here?

TIA


template.yaml

---
AWSTemplateFormatVersion: '2010-09-09'
Parameters:
  MemorySizeDefault:
    Type: String
    Default: '512'
  RuntimeVersion:
    Type: String
    Default: '3.10'
  TimeoutDefault:
    Type: String
    Default: '5'
Resources:
  MyTable:
    Type: 'AWS::DynamoDB::Table'
    Properties:
      AttributeDefinitions:
        - AttributeName: pk
          AttributeType: S
        - AttributeName: sk
          AttributeType: S
      KeySchema:
        - AttributeName: pk
          KeyType: HASH
        - AttributeName: sk
          KeyType: RANGE
      BillingMode: PAY_PER_REQUEST
      StreamSpecification:
        StreamViewType: NEW_AND_OLD_IMAGES
  MyPipe:
    Type: 'AWS::Pipes::Pipe'
    Properties:
      RoleArn:
        Fn::GetAtt:
        - MyPipeRole
        - Arn
      SourceParameters:
        DynamoDBStreamParameters:
          BatchSize: 1
          StartingPosition: LATEST
      Source:
        !GetAtt MyTable.StreamArn
      Target:
        !Sub 'arn:aws:events:${AWS::Region}:${AWS::AccountId}:event-bus/default'
  MyPipeRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: '2012-10-17'
        Statement:
        - Action: sts:AssumeRole
          Effect: Allow
          Principal:
            Service: pipes.amazonaws.com
      Policies:
      - PolicyDocument:
          Version: '2012-10-17'
          Statement:
          - Action:
            - dynamodb:DescribeStream
            - dynamodb:GetRecords
            - dynamodb:GetShardIterator
            - dynamodb:ListStreams
            - logs:CreateLogGroup
            - logs:CreateLogStream
            - logs:PutLogEvents
            Effect: Allow
            Resource: "*"
        PolicyName:
          Fn::Sub: my-pipe-role-policy-${AWS::StackName}
  MyEventRule:
    Type: AWS::Events::Rule
    Properties:
      EventPattern:
        detail:
          pk:
          - prefix: WIDGET
      Targets:
      - Id: my-function
        Arn:
          Fn::GetAtt:
          - MyFunction
          - Arn
      State: ENABLED
  MyEventRulePermission:
    Type: AWS::Lambda::Permission
    Properties:
      Action: lambda:InvokeFunction
      Principal: events.amazonaws.com
      FunctionName:
        Ref: MyFunction
      SourceArn:
        Fn::GetAtt:
        - MyEventRule
        - Arn
  MyFunction:
    Type: AWS::Lambda::Function
    Properties:
      Role:
        Fn::GetAtt:
        - MyFunctionRole
        - Arn
      MemorySize:
        Ref: MemorySizeDefault
      Timeout:
        Ref: TimeoutDefault
      Code:
        ZipFile: |-
          def handler(event, context):
              print (event)
      Handler: index.handler
      Runtime:
        Fn::Sub: python${RuntimeVersion}
  MyFunctionRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: '2012-10-17'
        Statement:
        - Action: sts:AssumeRole
          Effect: Allow
          Principal:
            Service: lambda.amazonaws.com
      Policies:
      - PolicyDocument:
          Version: '2012-10-17'
          Statement:
          - Action:
            - logs:CreateLogGroup
            - logs:CreateLogStream
            - logs:PutLogEvents
            Effect: Allow
            Resource: "*"
        PolicyName:
          Fn::Sub: my-function-role-policy-${AWS::StackName}
Outputs:
  Function:
    Value:
      Ref: MyFunction
  Table:
    Value:
      Ref: MyTable

push_item.py

from botocore.exceptions import ClientError

import boto3, json, os

def fetch_outputs(cf, stackname):
    outputs={}
    for stack in cf.describe_stacks()["Stacks"]:
        if (stack["StackName"].startswith(stackname) and
            "Outputs" in stack):
            for output in stack["Outputs"]:
                outputs[output["OutputKey"]]=output["OutputValue"]
    return outputs

if __name__=="__main__":
    try:
        params=dict([row.split("=")
                     for row in open("app.props").read().split("\n")
                     if row!=''])
        stackname=params["AppName"]
        cf=boto3.client("cloudformation")
        outputs=fetch_outputs(cf, stackname)
        tablename=outputs["Table"]
        table=boto3.resource("dynamodb").Table(tablename)
        print (table.put_item(Item={"pk": "WIDGET",
                                    "sk": "ABCDE"}))
    except RuntimeError as error:
        print ("Error: %s" % error)
    except ClientError as error:
        print ("Error: %s" % (str(error)))
0

There are 0 best solutions below