Voiced by Amazon Polly |
Introduction
In this blog, we will explore how to send notifications to a Slack channel whenever changes are made to security groups. We will achieve this using AWS Lambda, Amazon EventBridge, and a CloudFormation template.
Pioneers in Cloud Consulting & Migration Services
- Reduced infrastructural costs
- Accelerated application deployment
Solution Overview
The solution consists of:
- An AWS Lambda function that processes security group modification events and sends a notification to Slack.
- An Amazon EventBridge rule that captures specific security group change events and triggers the Lambda function.
- An Amazon Systems Manager (SSM) Parameter Store entry that stores the Slack Webhook URL securely.
- AWS CloudFormation template to automate the deployment of these components.
Setting Up the Slack Notification Bot
You must set up a Slack bot with a webhook URL to send messages to a Slack channel. You can follow the guide provided in this reference to create a Slack webhook and obtain the required URL.
AWS CloudFormation Template
The following AWS CloudFormation template sets up the required AWS resources:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 |
AWSTemplateFormatVersion: '2010-09-09' Transform: AWS::Serverless-2016-10-31 Description: CloudFormation template to create setup for sending notifications on security group changes. Parameters: SlackWebhookSSMParam: Type: String Default: "/CFN/slack/webhook/url" Description: "SSM Parameter Name for Slack Webhook URL" Resources: LambdaExecutionRole: Type: 'AWS::IAM::Role' Properties: RoleName: "SecurityGroupNotificationRole" AssumeRolePolicyDocument: Version: '2012-10-17' Statement: - Effect: 'Allow' Action: 'sts:AssumeRole' Principal: Service: 'lambda.amazonaws.com' ManagedPolicyArns: - "arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole" Policies: - PolicyName: 'LambdaExecutionPolicy' PolicyDocument: Version: '2012-10-17' Statement: - Effect: 'Allow' Action: - 'ec2:DescribeTags' Resource: '*' SecurityGroupEventLambda: Type: 'AWS::Serverless::Function' Properties: FunctionName: "SecurityGroupEventNotifier" CodeUri: lambdaFunction/ Handler: 'app.lambda_handler' Runtime: "python3.12" Timeout: 60 Role: !GetAtt LambdaExecutionRole.Arn Environment: Variables: SLACK_WEBHOOK_URL: !Sub '{{resolve:ssm:${SlackWebhookSSMParam}}}' SecurityGroupEventRule: Type: 'AWS::Events::Rule' Properties: Name: 'SecurityGroupEventRule' EventPattern: source: - 'aws.ec2' detail-type: - 'AWS API Call via CloudTrail' detail: eventSource: - 'ec2.amazonaws.com' eventName: - 'AuthorizeSecurityGroupIngress' - 'AuthorizeSecurityGroupEgress' - 'RevokeSecurityGroupIngress' - 'RevokeSecurityGroupEgress' - 'CreateSecurityGroup' - 'DeleteSecurityGroup' Targets: - Arn: !GetAtt SecurityGroupEventLambda.Arn Id: 'SecurityGroupEventLambdaTarget' LambdaInvokePermission: Type: 'AWS::Lambda::Permission' Properties: Action: 'lambda:InvokeFunction' FunctionName: !Ref SecurityGroupEventLambda Principal: 'events.amazonaws.com' SourceArn: !GetAtt SecurityGroupEventRule.Arn Outputs: LambdaFunctionArn: Description: 'Lambda Function ARN' Value: !GetAtt SecurityGroupEventLambda.Arn |
AWS Lambda Function Code
The AWS Lambda function processes security group change events and sends notifications to Slack.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 |
import json import boto3 import gzip import base64 import urllib.request import os import re # Initialize AWS client ec2_client = boto3.client("ec2") cf_client = boto3.client("cloudformation") cloudtrail_client = boto3.client("cloudtrail") codepipeline_client = boto3.client("codepipeline") # Slack webhook URL (Set this as an environment variable in Lambda) SLACK_WEBHOOK_URL = os.environ["SLACK_WEBHOOK_URL"] def send_to_slack(message): """Send formatted message to Slack channel.""" slack_message = {"text": message} encoded_msg = json.dumps(slack_message).encode("utf-8") req = urllib.request.Request( SLACK_WEBHOOK_URL, data=encoded_msg, headers={"Content-Type": "application/json"}, method="POST", ) with urllib.request.urlopen(req) as response: return response.getcode() def extract_user_info(user_arn): """Extract Role and User from ARN.""" parts = user_arn.split("/") if len(parts) >= 2: role = parts[-2] user = parts[-1] else: role = "Unknown Role" user = "Unknown User" return role, user def get_stack_info(security_group_id): """Retrieve StackName, determine if it's a root or nested stack, and fetch last updated time.""" try: response = ec2_client.describe_tags( Filters=[{"Name": "resource-id", "Values": [security_group_id]}] ) stack_name = "N/A" stack_id = None root_stack_id = None is_nested_stack = False updated_time = "N/A" pipeline_name = None reviewer_arn = "N/A" for tag in response.get("Tags", []): if tag["Key"] == "StackName": stack_name = tag["Value"] elif tag["Key"] == "aws:cloudformation:stack-id": stack_id = tag["Value"] if stack_id: try: stack_response = cf_client.describe_stacks(StackName=stack_id) stack_details = stack_response["Stacks"][0] # If 'RootId' exists, it's a nested stack if "RootId" in stack_details: is_nested_stack = True root_stack_id = stack_details["RootId"] else: root_stack_id = stack_id # If no RootId, the stack itself is root # Get the last updated time updated_time = stack_details.get("LastUpdatedTime", "N/A") # Fetch tags from root stack if root_stack_id: pipeline_name = get_pipeline_name_from_root_stack(root_stack_id) if pipeline_name: reviewer_summary = get_last_approval_summary(pipeline_name) except Exception as e: print(f"Error describing CloudFormation stack {stack_id}: {e}") return stack_name, stack_id, is_nested_stack, root_stack_id, updated_time, pipeline_name, reviewer_summary except Exception as e: print(f"Error fetching stack info for {security_group_id}: {e}") return "N/A", None, False, None, "N/A", None, "N/A" def get_pipeline_name_from_root_stack(root_stack_id): """Fetch PipelineName tag from the root stack.""" try: response = cf_client.describe_stacks(StackName=root_stack_id) stack_tags = response["Stacks"][0].get("Tags", []) for tag in stack_tags: if tag["Key"] == "PipelineName": return tag["Value"] except Exception as e: print(f"Error fetching PipelineName from root stack {root_stack_id}: {e}") return None def get_last_approval_summary(pipeline_name): """Fetch the last approval summary from the pipeline's approval stage.""" try: response = codepipeline_client.get_pipeline_state(name=pipeline_name) for stage in response.get("stageStates", []): if stage.get("stageName") == "Approval": # Adjust stage name if needed for action_state in stage.get("actionStates", []): if action_state.get("actionName") == "ManualApprove": # Adjust action name if needed latest_execution = action_state.get("latestExecution", {}) summary = latest_execution.get("summary", "No summary available") # Remove the 'Approved by' part and extract the email match = re.search(r'Approved by arn:aws:sts::\d+:assumed-role/[^/]+/([^/]+)', summary) if match: # Return just the email without "Approved by" return match.group(1) # Returns the email part return summary except Exception as e: print(f"Error fetching approval summary for pipeline {pipeline_name}: {e}") return "No summary available" def parse_event(event_data): """Extract meaningful information from CloudTrail log.""" message = "" event_name = event_data.get("eventName", "") user_arn = event_data.get("userIdentity", {}).get("arn", "Unknown User") role, user = extract_user_info(user_arn) security_group_id = event_data.get("requestParameters", {}).get("groupId", "N/A") # Fetch Stack Name, root stack info, updated time, pipeline, and reviewer summary stack_info = "" if security_group_id != "N/A": if user == "AWSCloudFormation": stack_name, stack_id, is_nested, root_stack_id, updated_time, pipeline_name, reviewer_summary = get_stack_info(security_group_id) stack_info = f"\n Stack Name: {stack_name}" if pipeline_name: stack_info += f"\n Pipeline Name: {pipeline_name}" if reviewer_summary: stack_info += f"\n Pipeline Approved By: {reviewer_summary}" if event_name == "CreateSecurityGroup": sg_name = event_data["requestParameters"]["groupName"] vpc_id = event_data["requestParameters"]["vpcId"] message = ( f"🚀 Security Group Created\n" f"SG Name: {sg_name}\n" f"SG ID: {security_group_id}\n" f"VPC: {vpc_id}\n" f"Role: {role}\n" f"User: {user}" f"{stack_info}" ) elif event_name in ["AuthorizeSecurityGroupIngress", "AuthorizeSecurityGroupEgress"]: ip_permissions = event_data["requestParameters"]["ipPermissions"]["items"] rules = [] for perm in ip_permissions: protocol = perm.get("ipProtocol", "N/A") from_port = perm.get("fromPort", "N/A") to_port = perm.get("toPort", "N/A") cidr_blocks_v4 = perm.get("ipRanges", {}).get("items", []) cidr_blocks_v6 = perm.get("ipv6Ranges", {}).get("items", []) # Fix for IPv6 sg_references = perm.get("groups", {}).get("items", []) alert_emoji = "" # Initialize to prevent UnboundLocalError if sg_references: referenced_sg = sg_references[0]["groupId"] rule_info = f"Protocol: {protocol} \nPort: {from_port}-{to_port} \nReferenced SG: {referenced_sg}" elif cidr_blocks_v4 or cidr_blocks_v6: cidr_ip_v4 = cidr_blocks_v4[0].get("cidrIp") if cidr_blocks_v4 else None cidr_ip_v6 = cidr_blocks_v6[0].get("cidrIpv6") if cidr_blocks_v6 else None if cidr_ip_v4 == "0.0.0.0/0": rule_info = f"Protocol: {protocol} \nPort: {from_port}-{to_port} \nCIDR: 🚨🚨 {cidr_ip_v4} 🚨🚨" elif cidr_ip_v4: rule_info = f"Protocol: {protocol} \nPort: {from_port}-{to_port} \nCIDR: {cidr_ip_v4}" if cidr_ip_v6 == "::/0": rule_info = f"Protocol: {protocol} \nPort: {from_port}-{to_port} \nCIDR: 🚨🚨 {cidr_ip_v6} 🚨🚨" elif cidr_ip_v6: rule_info = f"Protocol: {protocol} \nPort: {from_port}-{to_port} \nCIDR: {cidr_ip_v6}" else: rule_info = f"Protocol: {protocol} \nPort: {from_port}-{to_port} \nNo CIDR or SG Reference" rules.append(rule_info) action = "Ingress" if event_name == "AuthorizeSecurityGroupIngress" else "Egress" message = ( f"✅ Security Group Rule Added\n" f"SG ID: {security_group_id}\n" f"Action: {action}\n" f"{chr(10).join(rules)}\n" f"Role: {role}\n" f"User: {user}" f"{stack_info}" ) elif event_name in ["RevokeSecurityGroupIngress", "RevokeSecurityGroupEgress"]: ip_permissions = event_data["requestParameters"].get("ipPermissions", {}).get("items", []) rules = [] for perm in ip_permissions: protocol = perm.get("ipProtocol", "N/A") from_port = perm.get("fromPort", "N/A") to_port = perm.get("toPort", "N/A") cidr_blocks_v4 = perm.get("ipRanges", {}).get("items", []) cidr_blocks_v6 = perm.get("ipv6Ranges", {}).get("items", []) # Fix for IPv6 sg_references = perm.get("groups", {}).get("items", []) alert_emoji = "" # Initialize to prevent UnboundLocalError if sg_references: referenced_sg = sg_references[0]["groupId"] rule_info = f"Protocol: {protocol} \nPort: {from_port}-{to_port} \nReferenced SG: {referenced_sg}" elif cidr_blocks_v4 or cidr_blocks_v6: cidr_ip_v4 = cidr_blocks_v4[0].get("cidrIp") if cidr_blocks_v4 else None cidr_ip_v6 = cidr_blocks_v6[0].get("cidrIpv6") if cidr_blocks_v6 else None if cidr_ip_v4 == "0.0.0.0/0": rule_info = f"Protocol: {protocol} \nPort: {from_port}-{to_port} \nCIDR: 🚨🚨 {cidr_ip_v4} 🚨🚨" elif cidr_ip_v4: rule_info = f"Protocol: {protocol} \nPort: {from_port}-{to_port} \nCIDR: {cidr_ip_v4}" if cidr_ip_v6 == "::/0": rule_info = f"Protocol: {protocol} \nPort: {from_port}-{to_port} \nCIDR: 🚨🚨 {cidr_ip_v6} 🚨🚨" elif cidr_ip_v6: rule_info = f"Protocol: {protocol} \nPort: {from_port}-{to_port} \nCIDR: {cidr_ip_v6}" else: rule_info = f"Protocol: {protocol} \nPort: {from_port}-{to_port} \nNo CIDR or SG Reference" rules.append(rule_info) action = "Ingress" if event_name == "RevokeSecurityGroupIngress" else "Egress" message = ( f"⚠️ Security Group Rule Removed\n" f"SG ID: {security_group_id}\n" f"Action: {action}\n" f"{chr(10).join(rules)}\n" f"Role: {role}\n" f"User: {user}" f"{stack_info}" ) return message def lambda_handler(event, context): print("Received event:", json.dumps(event, indent=2)) # Check if it's an EventBridge event if "detail" in event: event_data = event["detail"] print("Processing EventBridge event:", event_data) message = parse_event(event_data) if message: response_code = send_to_slack(message) print(f"Sent to Slack, Response: {response_code}") else: print("No relevant security group event detected.") return {"statusCode": 200, "body": "Processed EventBridge event"} # Handle unknown event structure print("Unknown event format:", event) return {"statusCode": 400, "body": "Unknown event format"} |
Conclusion
This solution provides a way to monitor security group changes in AWS and instantly notify your team via Slack. By using AWS Lambda and Amazon EventBridge, you can automate security monitoring without manual intervention. You can enhance the function further by adding logging and exception handling or integrating it with other security tools.
Drop a query if you have any questions regarding Slack and we will get back to you quickly.
Making IT Networks Enterprise-ready – Cloud Management Services
- Accelerated cloud migration
- End-to-end view of the cloud environment
About CloudThat
CloudThat is a leading provider of Cloud Training and Consulting services with a global presence in India, the USA, Asia, Europe, and Africa. Specializing in AWS, Microsoft Azure, GCP, VMware, Databricks, and more, the company serves mid-market and enterprise clients, offering comprehensive expertise in Cloud Migration, Data Platforms, DevOps, IoT, AI/ML, and more.
CloudThat is the first Indian Company to win the prestigious Microsoft Partner 2024 Award and is recognized as a top-tier partner with AWS and Microsoft, including the prestigious ‘Think Big’ partner award from AWS and the Microsoft Superstars FY 2023 award in Asia & India. Having trained 650k+ professionals in 500+ cloud certifications and completed 300+ consulting projects globally, CloudThat is an official AWS Advanced Consulting Partner, Microsoft Gold Partner, AWS Training Partner, AWS Migration Partner, AWS Data and Analytics Partner, AWS DevOps Competency Partner, AWS GenAI Competency Partner, Amazon QuickSight Service Delivery Partner, Amazon EKS Service Delivery Partner, AWS Microsoft Workload Partners, Amazon EC2 Service Delivery Partner, Amazon ECS Service Delivery Partner, AWS Glue Service Delivery Partner, Amazon Redshift Service Delivery Partner, AWS Control Tower Service Delivery Partner, AWS WAF Service Delivery Partner, Amazon CloudFront, Amazon OpenSearch, AWS DMS, AWS Systems Manager, Amazon RDS, and many more.
FAQs
1. Can I use Amazon SNS instead of Slack for notifications?
ANS: – Yes, you can modify the AWS Lambda function to publish messages to an Amazon SNS topic instead of sending them to Slack.
2. What happens if the Slack webhook URL is incorrect?
ANS: – If the webhook URL is invalid, the AWS Lambda function will fail when attempting to send a message, and the error logs will be available in Amazon CloudWatch.

WRITTEN BY Deepak S
Deepak S works as a Research Intern at CloudThat. His expertise lies in AWS's services. Deepak is good at haunting new technologies and automobile enthusiasts.
Comments