I have a Python script that uses boto3 to interact with AWS services. I'm trying to integrate this script so that, once executed, it activates GuardDuty and creates a logic to forward high severity findings to a Slack channel.
I followed this AWS documentation.
Here is the code I have produced so far:
def get_aws_clients():
#Omitted the rest
global guardduty
global events
global sns
global chatbot
boto3.setup_default_session(region_name=cfg.aws.region) #us-east-1
#Omitted the rest
guardduty = boto3.client('guardduty')
events = boto3.client('events')
sns = boto3.client('sns')
chatbot = boto3.client('chatbot')
def enable_guardduty():
# Verify if GuardDuty is already enabled
existing_detectors = guardduty.list_detectors()
if existing_detectors['DetectorIds']:
log.info("GuardDuty already enabled.")
return existing_detectors['DetectorIds'][0]
# Enable GuardDuty
response = guardduty.create_detector(Enable=True)
detector_id = response['DetectorId']
return detector_id
def configure_cloudwatch_events(rule_name):
response = events.put_rule(
Name=rule_name,
EventPattern=json.dumps({
"source": ["aws.guardduty"],
"detail-type": ["GuardDuty Finding"],
"detail": {
"severity": [
7, 7.0, 7.1, 7.2, 7.3, 7.4, 7.5,7.6, 7.7, 7.8, 7.9,
8, 8.0, 8.1, 8.2, 8.3, 8.4, 8.5, 8.6, 8.7, 8.8, 8.9
]
}
}),
State='ENABLED'
)
return response['RuleArn']
def configure_sns_topic(topic_name):
existing_topics = sns.list_topics()['Topics']
for topic in existing_topics:
if topic['TopicArn'].split(':')[-1] == topic_name:
log.info("Found existing SNS topic")
return topic['TopicArn']
response = sns.create_topic(Name=topic_name)
topic_arn = response['TopicArn']
policy = {
"Version": "2012-10-17",
"Statement": [{
"Effect": "Allow",
"Principal": {
"Service": "events.amazonaws.com"
},
"Action": "sns:Publish",
"Resource": topic_arn
}]
}
sns.set_topic_attributes(
TopicArn=topic_arn,
AttributeName='Policy',
AttributeValue=json.dumps(policy)
)
return topic_arn
def connect_sns_to_slack(topic_arn, slack_team_id, slack_channel_id):
#iam_role_arn = create_chatbot_iam_role()
iam_role_arn = 'arn:aws:iam::aws:policy/service-role/AWSServiceRoleForAWSChatbot'
response = chatbot.create_slack_channel_configuration(
ConfigurationName='GuardDutySlackConfig',
SlackTeamId=slack_team_id,
SlackChannelId=slack_channel_id,
SnsTopicArns=[topic_arn],
IamRoleArn=iam_role_arn
)
return response['SlackChannelConfigurationArn']
def create_chatbot_iam_role():
role_name = 'UserRoleForChatbot'
trust_policy = {
"Version": "2012-10-17",
"Statement": [{
"Effect": "Allow",
"Principal": {
"Service": "management.chatbot.amazonaws.com"
},
"Action": "sts:AssumeRole"
}]
}
try:
response = iam.create_role(
RoleName=role_name,
AssumeRolePolicyDocument=json.dumps(trust_policy)
)
role_arn = response['Role']['Arn']
log.info(f"User Iam role created for chatbot: {role_arn}")
iam.attach_role_policy(
RoleName=role_name,
PolicyArn='arn:aws:iam::aws:policy/ReadOnlyAccess'
)
return role_arn
except iam.exceptions.EntityAlreadyExistsException:
print("IAM role already existing.")
return None
def create_cloudwatch_event_rule(rule_name, target_arn):
response = events.put_targets(
Rule=rule_name,
Targets=[{
'Id': '1',
'Arn': target_arn
}]
)
return response['FailedEntryCount'] == 0
def configure_guard_duty_to_slack():
try:
# Configure GuardDuty
detector_id = enable_guardduty()
log.info(f"GuardDuty enabled with DetectorId: {detector_id}")
# Configure CloudWatch Events
rule_name = 'GuardDutyFindingRule'
rule_arn = configure_cloudwatch_events(rule_name)
log.info(f"CloudWatch Events rule configured with ARN: {rule_arn}")
# Configure SNS Topic
topic_arn = configure_sns_topic('GuardDutyFindingTopic')
log.info(f"SNS Topic configured with ARN: {topic_arn}")
# Connect SNS Topic to Slack through Chatbot
slack_channel_id = 'XXXXXXXX'
slack_team_id = 'YYYYYYYYY'
slack_config_arn = connect_sns_to_slack(topic_arn, slack_team_id, slack_channel_id)
log.info(f"Slack configuration created with ARN: {slack_config_arn}")
# Target Cloudwatch to SNS topic
success = create_cloudwatch_event_rule(rule_name, topic_arn)
if success:
log.info("CloudWatch Event rule created successfully.")
else:
log.info("Failed to create CloudWatch Event rule.")
except Exception as ex:
log.error(f"*** {ex}")
sys.exit(1)
I get this exception when the chatbot.create_slack_channel_configuration
method is called:
*** Could not connect to the endpoint URL: "https://chatbot.us-east-1.amazonaws.com/create-slack-channel-configuration"
Everything up to this point is working just fine, I believe. All the services are successfully created and I can verify them through the AWS console. I also tried to configure the Slack client via the AWS console, and it does work that way. I receive notifications on my Slack channel, which is a positive outcome. However, I need to automate all of these tasks programmatically through the Python script.
I'm not sure if this approach is correct and if I'm missing anything, I'm new to AWS. Any help or suggestions would be greatly appreciated. Thanks in advance!
Chances are this is due to the fact that AWS Chatbot is a global service that doesn't accept any region.
I see you have
boto3.setup_default_session(region_name=cfg.aws.region) #us-east-1
which sets us-east-1 as the default region and causes all boto3 clients everything to use regional endpoints, however AWS Chatbot doesn't have a regional endpoint and only a global endpoint.If you set the region some other way then it would work: