2022-10-11 15:26:03 -07:00
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
import logging
from pprint import pp
2024-09-05 12:39:58 -04:00
from typing import Any , Dict , Optional
2022-10-11 15:26:03 -07:00
import boto3
2024-09-05 12:39:58 -04:00
from botocore . exceptions import ClientError , WaiterError
2022-10-11 15:26:03 -07:00
logger = logging . getLogger ( __name__ )
# snippet-start:[python.example_code.ec2.SecurityGroupWrapper.class]
# snippet-start:[python.example_code.ec2.SecurityGroupWrapper.decl]
class SecurityGroupWrapper :
""" Encapsulates Amazon Elastic Compute Cloud (Amazon EC2) security group actions. """
2023-10-18 10:35:05 -07:00
2024-09-05 12:39:58 -04:00
def __init__ ( self , ec2_client : boto3 . client , security_group : Optional [ str ] = None ) :
2022-10-11 15:26:03 -07:00
"""
2024-09-05 12:39:58 -04:00
Initializes the SecurityGroupWrapper with an EC2 client and an optional security group ID.
:param ec2_client: A Boto3 Amazon EC2 client. This client provides low-level
access to AWS EC2 services.
:param security_group: The ID of a security group to manage. This is a high-level identifier
that represents the security group.
2022-10-11 15:26:03 -07:00
"""
2024-09-05 12:39:58 -04:00
self . ec2_client = ec2_client
2022-10-11 15:26:03 -07:00
self . security_group = security_group
@classmethod
2024-09-05 12:39:58 -04:00
def from_client ( cls ) - > " SecurityGroupWrapper " :
"""
Creates a SecurityGroupWrapper instance with a default EC2 client.
:return: An instance of SecurityGroupWrapper initialized with the default EC2 client.
"""
ec2_client = boto3 . client ( " ec2 " )
return cls ( ec2_client )
2023-10-18 10:35:05 -07:00
# snippet-end:[python.example_code.ec2.SecurityGroupWrapper.decl]
2022-10-11 15:26:03 -07:00
# snippet-start:[python.example_code.ec2.CreateSecurityGroup]
2024-09-05 12:39:58 -04:00
def create ( self , group_name : str , group_description : str ) - > str :
2022-10-11 15:26:03 -07:00
"""
2024-09-05 12:39:58 -04:00
Creates a security group in the default virtual private cloud (VPC) of the current account.
2022-10-11 15:26:03 -07:00
:param group_name: The name of the security group to create.
:param group_description: The description of the security group to create.
2024-09-05 12:39:58 -04:00
:return: The ID of the newly created security group.
:raise Handles AWS SDK service-level ClientError, with special handling for ResourceAlreadyExists
2022-10-11 15:26:03 -07:00
"""
try :
2024-09-05 12:39:58 -04:00
response = self . ec2_client . create_security_group (
2023-10-18 10:35:05 -07:00
GroupName = group_name , Description = group_description
)
2024-09-05 12:39:58 -04:00
self . security_group = response [ " GroupId " ]
2022-10-11 15:26:03 -07:00
except ClientError as err :
2024-09-05 12:39:58 -04:00
if err . response [ " Error " ] [ " Code " ] == " ResourceAlreadyExists " :
logger . error (
f " Security group ' { group_name } ' already exists. Please choose a different name. "
)
2022-10-11 15:26:03 -07:00
raise
else :
return self . security_group
2023-10-18 10:35:05 -07:00
2022-10-11 15:26:03 -07:00
# snippet-end:[python.example_code.ec2.CreateSecurityGroup]
# snippet-start:[python.example_code.ec2.AuthorizeSecurityGroupIngress]
2024-09-05 12:39:58 -04:00
def authorize_ingress ( self , ssh_ingress_ip : str ) - > Optional [ Dict [ str , Any ] ] :
2022-10-11 15:26:03 -07:00
"""
2022-10-12 15:02:52 -07:00
Adds a rule to the security group to allow access to SSH.
2022-10-11 15:26:03 -07:00
:param ssh_ingress_ip: The IP address that is granted inbound access to connect
to port 22 over TCP, used for SSH.
:return: The response to the authorization request. The ' Return ' field of the
2024-09-05 12:39:58 -04:00
response indicates whether the request succeeded or failed, or None if no security group is set.
:raise Handles AWS SDK service-level ClientError, with special handling for ResourceAlreadyExists
2022-10-11 15:26:03 -07:00
"""
if self . security_group is None :
logger . info ( " No security group to update. " )
2024-09-05 12:39:58 -04:00
return None
2022-10-11 15:26:03 -07:00
try :
2023-10-18 10:35:05 -07:00
ip_permissions = [
{
# SSH ingress open to only the specified IP address.
" IpProtocol " : " tcp " ,
" FromPort " : 22 ,
" ToPort " : 22 ,
" IpRanges " : [ { " CidrIp " : f " { ssh_ingress_ip } /32 " } ] ,
}
]
2024-09-05 12:39:58 -04:00
response = self . ec2_client . authorize_security_group_ingress (
GroupId = self . security_group , IpPermissions = ip_permissions
2023-10-18 10:35:05 -07:00
)
2022-10-11 15:26:03 -07:00
except ClientError as err :
2024-09-05 12:39:58 -04:00
if err . response [ " Error " ] [ " Code " ] == " InvalidPermission.Duplicate " :
logger . error (
f " The SSH ingress rule for IP { ssh_ingress_ip } already exists "
f " in security group ' { self . security_group } ' . "
)
2022-10-11 15:26:03 -07:00
raise
else :
return response
2023-10-18 10:35:05 -07:00
2022-10-11 15:26:03 -07:00
# snippet-end:[python.example_code.ec2.AuthorizeSecurityGroupIngress]
# snippet-start:[python.example_code.ec2.DescribeSecurityGroups]
2024-09-05 12:39:58 -04:00
def describe ( self , security_group_id : Optional [ str ] = None ) - > bool :
2022-10-11 15:26:03 -07:00
"""
2024-09-05 12:39:58 -04:00
Displays information about the specified security group or all security groups if no ID is provided.
2022-10-11 15:26:03 -07:00
2024-09-05 12:39:58 -04:00
:param security_group_id: The ID of the security group to describe.
If None, an open search is performed to describe all security groups.
:returns: True if the description is successful.
:raises ClientError: If there is an error describing the security group(s), such as an invalid security group ID.
"""
2022-10-11 15:26:03 -07:00
try :
2024-09-05 12:39:58 -04:00
paginator = self . ec2_client . get_paginator ( " describe_security_groups " )
if security_group_id is None :
# If no ID is provided, return all security groups.
page_iterator = paginator . paginate ( )
else :
page_iterator = paginator . paginate ( GroupIds = [ security_group_id ] )
for page in page_iterator :
for security_group in page [ " SecurityGroups " ] :
print ( f " Security group: { security_group [ ' GroupName ' ] } " )
print ( f " \t ID: { security_group [ ' GroupId ' ] } " )
print ( f " \t VPC: { security_group [ ' VpcId ' ] } " )
if security_group [ " IpPermissions " ] :
print ( " Inbound permissions: " )
pp ( security_group [ " IpPermissions " ] )
return True
2022-10-11 15:26:03 -07:00
except ClientError as err :
2024-09-05 12:39:58 -04:00
logger . error ( " Failed to describe security group(s). " )
if err . response [ " Error " ] [ " Code " ] == " InvalidGroup.NotFound " :
logger . error (
f " Security group { security_group_id } does not exist "
f " because the specified security group ID was not found. "
)
2022-10-11 15:26:03 -07:00
raise
2023-10-18 10:35:05 -07:00
2022-10-11 15:26:03 -07:00
# snippet-end:[python.example_code.ec2.DescribeSecurityGroups]
# snippet-start:[python.example_code.ec2.DeleteSecurityGroup]
2024-09-05 12:39:58 -04:00
def delete ( self , security_group_id : str ) - > bool :
2022-10-11 15:26:03 -07:00
"""
2024-09-05 12:39:58 -04:00
Deletes the specified security group.
:param security_group_id: The ID of the security group to delete. Required.
2022-10-11 15:26:03 -07:00
2024-09-05 12:39:58 -04:00
:returns: True if the deletion is successful.
:raises ClientError: If the security group cannot be deleted due to an AWS service error.
"""
2022-10-11 15:26:03 -07:00
try :
2024-09-05 12:39:58 -04:00
self . ec2_client . delete_security_group ( GroupId = security_group_id )
logger . info ( f " Successfully deleted security group ' { security_group_id } ' " )
return True
2022-10-11 15:26:03 -07:00
except ClientError as err :
2024-09-05 12:39:58 -04:00
logger . error ( f " Deletion failed for security group ' { security_group_id } ' " )
error_code = err . response [ " Error " ] [ " Code " ]
if error_code == " InvalidGroup.NotFound " :
logger . error (
f " Security group ' { security_group_id } ' cannot be deleted because it does not exist. "
)
elif error_code == " DependencyViolation " :
logger . error (
f " Security group ' { security_group_id } ' cannot be deleted because it is still in use. "
" Verify that it is: "
" \n \t - Detached from resources "
" \n \t - Removed from references in other groups "
" \n \t - Removed from VPC ' s as a default group "
)
2022-10-11 15:26:03 -07:00
raise
2023-10-18 10:35:05 -07:00
2022-10-11 15:26:03 -07:00
# snippet-end:[python.example_code.ec2.DeleteSecurityGroup]
2023-10-18 10:35:05 -07:00
2022-10-11 15:26:03 -07:00
# snippet-end:[python.example_code.ec2.SecurityGroupWrapper.class]