Configure AWS GuardDuty to post Findings notifications to Slack

81 Views Asked by At

I have a Python script that uses boto3 to interact with AWS services. I'm trying to integrate this script so that, once executed, it activates GuardDuty and creates a logic to forward high severity findings to a Slack channel.

I followed this AWS documentation.

Here is the code I have produced so far:


def get_aws_clients():
    #Omitted the rest
    global guardduty
    global events
    global sns
    global chatbot
    boto3.setup_default_session(region_name=cfg.aws.region) #us-east-1
    #Omitted the rest
    guardduty = boto3.client('guardduty')
    events = boto3.client('events')
    sns = boto3.client('sns')
    chatbot = boto3.client('chatbot')


def enable_guardduty():
    # Verify if GuardDuty is already enabled
    existing_detectors = guardduty.list_detectors()
    if existing_detectors['DetectorIds']:
        log.info("GuardDuty already enabled.")
        return existing_detectors['DetectorIds'][0]
    # Enable GuardDuty
    response = guardduty.create_detector(Enable=True)
    detector_id = response['DetectorId']
    return detector_id


def configure_cloudwatch_events(rule_name):
    response = events.put_rule(
        Name=rule_name,
        EventPattern=json.dumps({
            "source": ["aws.guardduty"],
            "detail-type": ["GuardDuty Finding"],
            "detail": {
                "severity": [
                    7, 7.0, 7.1, 7.2, 7.3, 7.4, 7.5,7.6, 7.7, 7.8, 7.9,
    8, 8.0, 8.1, 8.2, 8.3, 8.4, 8.5, 8.6, 8.7, 8.8, 8.9
                ]
            }
        }),
        State='ENABLED'
    )
    return response['RuleArn']


def configure_sns_topic(topic_name):
    existing_topics = sns.list_topics()['Topics']
    for topic in existing_topics:
        if topic['TopicArn'].split(':')[-1] == topic_name:
            log.info("Found existing SNS topic")
            return topic['TopicArn']
    response = sns.create_topic(Name=topic_name)
    topic_arn = response['TopicArn']
    policy = {
        "Version": "2012-10-17",
        "Statement": [{
            "Effect": "Allow",
            "Principal": {
                "Service": "events.amazonaws.com"
            },
            "Action": "sns:Publish",
            "Resource": topic_arn
        }]
    }
    sns.set_topic_attributes(
        TopicArn=topic_arn,
        AttributeName='Policy',
        AttributeValue=json.dumps(policy)
    )
    return topic_arn


def connect_sns_to_slack(topic_arn, slack_team_id, slack_channel_id):
    #iam_role_arn = create_chatbot_iam_role()
    iam_role_arn = 'arn:aws:iam::aws:policy/service-role/AWSServiceRoleForAWSChatbot'
    response = chatbot.create_slack_channel_configuration(
        ConfigurationName='GuardDutySlackConfig',
        SlackTeamId=slack_team_id,
        SlackChannelId=slack_channel_id,
        SnsTopicArns=[topic_arn],
        IamRoleArn=iam_role_arn
    )
    return response['SlackChannelConfigurationArn']


def create_chatbot_iam_role():
    role_name = 'UserRoleForChatbot'
    trust_policy = {
        "Version": "2012-10-17",
        "Statement": [{
            "Effect": "Allow",
            "Principal": {
                "Service": "management.chatbot.amazonaws.com"
            },
            "Action": "sts:AssumeRole"
        }]
    }
    try:
        response = iam.create_role(
            RoleName=role_name,
            AssumeRolePolicyDocument=json.dumps(trust_policy)
        )
        role_arn = response['Role']['Arn']
        log.info(f"User Iam role created for chatbot: {role_arn}")
        iam.attach_role_policy(
            RoleName=role_name,
            PolicyArn='arn:aws:iam::aws:policy/ReadOnlyAccess'
        )
        return role_arn

    except iam.exceptions.EntityAlreadyExistsException:
        print("IAM role already existing.")
        return None


def create_cloudwatch_event_rule(rule_name, target_arn):
    response = events.put_targets(
        Rule=rule_name,
        Targets=[{
            'Id': '1',
            'Arn': target_arn
        }]
    )
    return response['FailedEntryCount'] == 0


def configure_guard_duty_to_slack():
    try:
        # Configure GuardDuty
        detector_id = enable_guardduty()
        log.info(f"GuardDuty enabled with DetectorId: {detector_id}")
        # Configure CloudWatch Events
        rule_name = 'GuardDutyFindingRule'
        rule_arn = configure_cloudwatch_events(rule_name)
        log.info(f"CloudWatch Events rule configured with ARN: {rule_arn}")
        # Configure SNS Topic
        topic_arn = configure_sns_topic('GuardDutyFindingTopic')
        log.info(f"SNS Topic configured with ARN: {topic_arn}")
        # Connect SNS Topic to Slack through Chatbot
        slack_channel_id = 'XXXXXXXX'
        slack_team_id = 'YYYYYYYYY'
        slack_config_arn = connect_sns_to_slack(topic_arn, slack_team_id, slack_channel_id)
        log.info(f"Slack configuration created with ARN: {slack_config_arn}")
        # Target Cloudwatch to SNS topic
        success = create_cloudwatch_event_rule(rule_name, topic_arn)
        if success:
            log.info("CloudWatch Event rule created successfully.")
        else:
            log.info("Failed to create CloudWatch Event rule.")
    except Exception as ex:
        log.error(f"*** {ex}")
        sys.exit(1)

I get this exception when the chatbot.create_slack_channel_configuration method is called:

*** Could not connect to the endpoint URL: "https://chatbot.us-east-1.amazonaws.com/create-slack-channel-configuration"

Everything up to this point is working just fine, I believe. All the services are successfully created and I can verify them through the AWS console. I also tried to configure the Slack client via the AWS console, and it does work that way. I receive notifications on my Slack channel, which is a positive outcome. However, I need to automate all of these tasks programmatically through the Python script.

I'm not sure if this approach is correct and if I'm missing anything, I'm new to AWS. Any help or suggestions would be greatly appreciated. Thanks in advance!

1

There are 1 best solutions below

1
On

Chances are this is due to the fact that AWS Chatbot is a global service that doesn't accept any region.

I see you have boto3.setup_default_session(region_name=cfg.aws.region) #us-east-1 which sets us-east-1 as the default region and causes all boto3 clients everything to use regional endpoints, however AWS Chatbot doesn't have a regional endpoint and only a global endpoint.

If you set the region some other way then it would work:

guardduty = boto3.client('guardduty', region_name='us-east-1')
events = boto3.client('events', region_name='us-east-1')
sns = boto3.client('sns', region_name='us-east-1')
chatbot = boto3.client('chatbot')  # don't set a region here