I'm trying to take advantage of the (newish) AWS Pipes functionality to connect DynamoDB events to Eventbridge
I have a Cloudformation template (see below) and a Python script to push an item into DynamoDB (also see below)
The template deploys fine, the script runs fine, I can see the item in DynamoDB via the console, but the target Lambda function doesn't seem to receive an event from Eventbridge
Where am I going wrong here?
TIA
template.yaml
---
AWSTemplateFormatVersion: '2010-09-09'
Parameters:
MemorySizeDefault:
Type: String
Default: '512'
RuntimeVersion:
Type: String
Default: '3.10'
TimeoutDefault:
Type: String
Default: '5'
Resources:
MyTable:
Type: 'AWS::DynamoDB::Table'
Properties:
AttributeDefinitions:
- AttributeName: pk
AttributeType: S
- AttributeName: sk
AttributeType: S
KeySchema:
- AttributeName: pk
KeyType: HASH
- AttributeName: sk
KeyType: RANGE
BillingMode: PAY_PER_REQUEST
StreamSpecification:
StreamViewType: NEW_AND_OLD_IMAGES
MyPipe:
Type: 'AWS::Pipes::Pipe'
Properties:
RoleArn:
Fn::GetAtt:
- MyPipeRole
- Arn
SourceParameters:
DynamoDBStreamParameters:
BatchSize: 1
StartingPosition: LATEST
Source:
!GetAtt MyTable.StreamArn
Target:
!Sub 'arn:aws:events:${AWS::Region}:${AWS::AccountId}:event-bus/default'
MyPipeRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Action: sts:AssumeRole
Effect: Allow
Principal:
Service: pipes.amazonaws.com
Policies:
- PolicyDocument:
Version: '2012-10-17'
Statement:
- Action:
- dynamodb:DescribeStream
- dynamodb:GetRecords
- dynamodb:GetShardIterator
- dynamodb:ListStreams
- logs:CreateLogGroup
- logs:CreateLogStream
- logs:PutLogEvents
Effect: Allow
Resource: "*"
PolicyName:
Fn::Sub: my-pipe-role-policy-${AWS::StackName}
MyEventRule:
Type: AWS::Events::Rule
Properties:
EventPattern:
detail:
pk:
- prefix: WIDGET
Targets:
- Id: my-function
Arn:
Fn::GetAtt:
- MyFunction
- Arn
State: ENABLED
MyEventRulePermission:
Type: AWS::Lambda::Permission
Properties:
Action: lambda:InvokeFunction
Principal: events.amazonaws.com
FunctionName:
Ref: MyFunction
SourceArn:
Fn::GetAtt:
- MyEventRule
- Arn
MyFunction:
Type: AWS::Lambda::Function
Properties:
Role:
Fn::GetAtt:
- MyFunctionRole
- Arn
MemorySize:
Ref: MemorySizeDefault
Timeout:
Ref: TimeoutDefault
Code:
ZipFile: |-
def handler(event, context):
print (event)
Handler: index.handler
Runtime:
Fn::Sub: python${RuntimeVersion}
MyFunctionRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Action: sts:AssumeRole
Effect: Allow
Principal:
Service: lambda.amazonaws.com
Policies:
- PolicyDocument:
Version: '2012-10-17'
Statement:
- Action:
- logs:CreateLogGroup
- logs:CreateLogStream
- logs:PutLogEvents
Effect: Allow
Resource: "*"
PolicyName:
Fn::Sub: my-function-role-policy-${AWS::StackName}
Outputs:
Function:
Value:
Ref: MyFunction
Table:
Value:
Ref: MyTable
push_item.py
from botocore.exceptions import ClientError
import boto3, json, os
def fetch_outputs(cf, stackname):
outputs={}
for stack in cf.describe_stacks()["Stacks"]:
if (stack["StackName"].startswith(stackname) and
"Outputs" in stack):
for output in stack["Outputs"]:
outputs[output["OutputKey"]]=output["OutputValue"]
return outputs
if __name__=="__main__":
try:
params=dict([row.split("=")
for row in open("app.props").read().split("\n")
if row!=''])
stackname=params["AppName"]
cf=boto3.client("cloudformation")
outputs=fetch_outputs(cf, stackname)
tablename=outputs["Table"]
table=boto3.resource("dynamodb").Table(tablename)
print (table.put_item(Item={"pk": "WIDGET",
"sk": "ABCDE"}))
except RuntimeError as error:
print ("Error: %s" % error)
except ClientError as error:
print ("Error: %s" % (str(error)))