SIGN IN SIGN UP

Welcome to the AWS Code Examples Repository. This repo contains code examples used in the AWS documentation, AWS SDK Developer Guides, and more. For more information, see the Readme.md file below.

0 0 1 Java
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
import logging
from pprint import pp
from typing import Any, Dict, Optional
import boto3
from botocore.exceptions import ClientError, WaiterError
logger = logging.getLogger(__name__)
# snippet-start:[python.example_code.ec2.SecurityGroupWrapper.class]
# snippet-start:[python.example_code.ec2.SecurityGroupWrapper.decl]
class SecurityGroupWrapper:
"""Encapsulates Amazon Elastic Compute Cloud (Amazon EC2) security group actions."""
def __init__(self, ec2_client: boto3.client, security_group: Optional[str] = None):
"""
Initializes the SecurityGroupWrapper with an EC2 client and an optional security group ID.
:param ec2_client: A Boto3 Amazon EC2 client. This client provides low-level
access to AWS EC2 services.
:param security_group: The ID of a security group to manage. This is a high-level identifier
that represents the security group.
"""
self.ec2_client = ec2_client
self.security_group = security_group
@classmethod
def from_client(cls) -> "SecurityGroupWrapper":
"""
Creates a SecurityGroupWrapper instance with a default EC2 client.
:return: An instance of SecurityGroupWrapper initialized with the default EC2 client.
"""
ec2_client = boto3.client("ec2")
return cls(ec2_client)
# snippet-end:[python.example_code.ec2.SecurityGroupWrapper.decl]
# snippet-start:[python.example_code.ec2.CreateSecurityGroup]
def create(self, group_name: str, group_description: str) -> str:
"""
Creates a security group in the default virtual private cloud (VPC) of the current account.
:param group_name: The name of the security group to create.
:param group_description: The description of the security group to create.
:return: The ID of the newly created security group.
:raise Handles AWS SDK service-level ClientError, with special handling for ResourceAlreadyExists
"""
try:
response = self.ec2_client.create_security_group(
GroupName=group_name, Description=group_description
)
self.security_group = response["GroupId"]
except ClientError as err:
if err.response["Error"]["Code"] == "ResourceAlreadyExists":
logger.error(
f"Security group '{group_name}' already exists. Please choose a different name."
)
raise
else:
return self.security_group
# snippet-end:[python.example_code.ec2.CreateSecurityGroup]
# snippet-start:[python.example_code.ec2.AuthorizeSecurityGroupIngress]
def authorize_ingress(self, ssh_ingress_ip: str) -> Optional[Dict[str, Any]]:
"""
2022-10-12 15:02:52 -07:00
Adds a rule to the security group to allow access to SSH.
:param ssh_ingress_ip: The IP address that is granted inbound access to connect
to port 22 over TCP, used for SSH.
:return: The response to the authorization request. The 'Return' field of the
response indicates whether the request succeeded or failed, or None if no security group is set.
:raise Handles AWS SDK service-level ClientError, with special handling for ResourceAlreadyExists
"""
if self.security_group is None:
logger.info("No security group to update.")
return None
try:
ip_permissions = [
{
# SSH ingress open to only the specified IP address.
"IpProtocol": "tcp",
"FromPort": 22,
"ToPort": 22,
"IpRanges": [{"CidrIp": f"{ssh_ingress_ip}/32"}],
}
]
response = self.ec2_client.authorize_security_group_ingress(
GroupId=self.security_group, IpPermissions=ip_permissions
)
except ClientError as err:
if err.response["Error"]["Code"] == "InvalidPermission.Duplicate":
logger.error(
f"The SSH ingress rule for IP {ssh_ingress_ip} already exists"
f"in security group '{self.security_group}'."
)
raise
else:
return response
# snippet-end:[python.example_code.ec2.AuthorizeSecurityGroupIngress]
# snippet-start:[python.example_code.ec2.DescribeSecurityGroups]
def describe(self, security_group_id: Optional[str] = None) -> bool:
"""
Displays information about the specified security group or all security groups if no ID is provided.
:param security_group_id: The ID of the security group to describe.
If None, an open search is performed to describe all security groups.
:returns: True if the description is successful.
:raises ClientError: If there is an error describing the security group(s), such as an invalid security group ID.
"""
try:
paginator = self.ec2_client.get_paginator("describe_security_groups")
if security_group_id is None:
# If no ID is provided, return all security groups.
page_iterator = paginator.paginate()
else:
page_iterator = paginator.paginate(GroupIds=[security_group_id])
for page in page_iterator:
for security_group in page["SecurityGroups"]:
print(f"Security group: {security_group['GroupName']}")
print(f"\tID: {security_group['GroupId']}")
print(f"\tVPC: {security_group['VpcId']}")
if security_group["IpPermissions"]:
print("Inbound permissions:")
pp(security_group["IpPermissions"])
return True
except ClientError as err:
logger.error("Failed to describe security group(s).")
if err.response["Error"]["Code"] == "InvalidGroup.NotFound":
logger.error(
f"Security group {security_group_id} does not exist "
f"because the specified security group ID was not found."
)
raise
# snippet-end:[python.example_code.ec2.DescribeSecurityGroups]
# snippet-start:[python.example_code.ec2.DeleteSecurityGroup]
def delete(self, security_group_id: str) -> bool:
"""
Deletes the specified security group.
:param security_group_id: The ID of the security group to delete. Required.
:returns: True if the deletion is successful.
:raises ClientError: If the security group cannot be deleted due to an AWS service error.
"""
try:
self.ec2_client.delete_security_group(GroupId=security_group_id)
logger.info(f"Successfully deleted security group '{security_group_id}'")
return True
except ClientError as err:
logger.error(f"Deletion failed for security group '{security_group_id}'")
error_code = err.response["Error"]["Code"]
if error_code == "InvalidGroup.NotFound":
logger.error(
f"Security group '{security_group_id}' cannot be deleted because it does not exist."
)
elif error_code == "DependencyViolation":
logger.error(
f"Security group '{security_group_id}' cannot be deleted because it is still in use."
" Verify that it is:"
"\n\t- Detached from resources"
"\n\t- Removed from references in other groups"
"\n\t- Removed from VPC's as a default group"
)
raise
# snippet-end:[python.example_code.ec2.DeleteSecurityGroup]
# snippet-end:[python.example_code.ec2.SecurityGroupWrapper.class]