SIGN IN SIGN UP

Welcome to the AWS Code Examples Repository. This repo contains code examples used in the AWS documentation, AWS SDK Developer Guides, and more. For more information, see the Readme.md file below.

0 0 46 Java
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
"""
Shows how to use the AWS SDK for Python (Boto3) with the Amazon Bedrock Agents Runtime
to manage versions of an Amazon Bedrock flow.
"""
import logging
from botocore.exceptions import ClientError
# Create a logger instance
logging.basicConfig(
level=logging.INFO
)
logger = logging.getLogger(__name__)
# snippet-start:[python.example_code.bedrock-agent.create_flow_version]
def create_flow_version(client, flow_id, description):
"""
Creates a version of an Amazon Bedrock flow.
Args:
client: Amazon Bedrock agent boto3 client.
flow_id (str): The identifier of the flow.
description (str) : A description for the flow.
Returns:
str: The version for the flow.
"""
try:
logger.info("Creating flow version for flow: %s.", flow_id)
# Call CreateFlowVersion operation
response = client.create_flow_version(
flowIdentifier=flow_id,
description=description
)
logging.info("Successfully created flow version %s for flow %s.",
response['version'], flow_id)
return response['version']
except ClientError as e:
logging.exception("Client error creating flow: %s", str(e))
raise
except Exception as e:
logging.exception("Unexpected error creating flow : %s", str(e))
raise
# snippet-end:[python.example_code.bedrock-agent.create_flow_version]
# snippet-start:[python.example_code.bedrock-agent.get_flow_version]
def get_flow_version(client, flow_id, flow_version):
"""
Gets information about a version of an Amazon Bedrock flow.
Args:
client: Amazon Bedrock agent boto3 client.
flow_id (str): The identifier of the flow.
flow_version (str): The flow version of the flow.
Returns:
dict: The response from the call to GetFlowVersion.
"""
try:
logger.info("Deleting flow version for flow: %s.", flow_id)
# Call GetFlowVersion operation
response = client.get_flow_version(
flowIdentifier=flow_id,
flowVersion=flow_version
)
logging.info("Successfully got flow version %s information for flow %s.",
flow_version,
flow_id)
return response
except ClientError as e:
logging.exception("Client error getting flow version: %s", str(e))
raise
except Exception as e:
logging.exception("Unexpected error getting flow version: %s", str(e))
raise
# snippet-end:[python.example_code.bedrock-agent.get_flow_version]
# snippet-start:[python.example_code.bedrock-agent.delete_flow_version]
def delete_flow_version(client, flow_id, flow_version):
"""
Deletes a version of an Amazon Bedrock flow.
Args:
client: Amazon Bedrock agent boto3 client.
flow_id (str): The identifier of the flow.
Returns:
dict: The response from DeleteFlowVersion.
"""
try:
logger.info("Deleting flow version %s for flow: %s.",flow_version, flow_id)
# Call DeleteFlowVersion operation
response = client.delete_flow_version(
flowIdentifier=flow_id,
flowVersion=flow_version
)
logging.info("Successfully deleted flow version %s for %s.",
flow_version,
flow_id)
return response
except ClientError as e:
logging.exception("Client error deleting flow version: %s ", str(e))
raise
except Exception as e:
logging.exception("Unexpected deleting flow version: %s", str(e))
raise
# snippet-end:[python.example_code.bedrock-agent.delete_flow_version]
# snippet-start:[python.example_code.bedrock-agent.list_flow_versions]
def list_flow_versions(client, flow_id):
"""
Lists the versions of an Amazon Bedrock flow.
Args:
client: Amazon bedrock agent boto3 client.
flow_id (str): The identifier of the flow.
Returns:
dict: The response from ListFlowVersions.
"""
try:
finished = False
logger.info("Listing flow versions for flow: %s.", flow_id)
response = client.list_flow_versions(
flowIdentifier=flow_id,
maxResults=10)
while finished is False:
print(f"Versions for flow:{flow_id}")
for version in response['flowVersionSummaries']:
print(f"Version: {version['version']}")
print(f"Status: {version['status']}\n")
if 'nextToken' in response:
next_token = response['nextToken']
response = client.list_flow_versions(maxResults=10,
nextToken=next_token)
else:
finished = True
logging.info("Successfully listed flow versions for flow %s.",
flow_id)
return response
except ClientError as e:
logging.exception("Client error listing flow versions: %s", str(e))
raise
except Exception as e:
logging.exception("Unexpected error listing flow versions: %s", str(e))
raise
# snippet-end:[python.example_code.bedrock-agent.list_flow_versions]