SIGN IN SIGN UP

Welcome to the AWS Code Examples Repository. This repo contains code examples used in the AWS documentation, AWS SDK Developer Guides, and more. For more information, see the Readme.md file below.

0 0 10 Java
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
"""
Purpose
Shows how to use the AWS SDK for Python (Boto3) with AWS Step Functions to
create and manage activities. An activity is used by a state machine to pause its
execution and let external code get current state data and send a response before the
state machine is resumed.
"""
import logging
from botocore.exceptions import ClientError
logger = logging.getLogger(__name__)
# snippet-start:[python.example_code.sfn.Activity_full]
# snippet-start:[python.example_code.sfn.Activity_decl]
class Activity:
"""Encapsulates Step Function activity actions."""
def __init__(self, stepfunctions_client):
"""
:param stepfunctions_client: A Boto3 Step Functions client.
"""
self.stepfunctions_client = stepfunctions_client
# snippet-end:[python.example_code.sfn.Activity_decl]
# snippet-start:[python.example_code.sfn.CreateActivity]
def create(self, name):
"""
Create an activity.
:param name: The name of the activity to create.
:return: The Amazon Resource Name (ARN) of the newly created activity.
"""
try:
response = self.stepfunctions_client.create_activity(name=name)
except ClientError as err:
logger.error(
"Couldn't create activity %s. Here's why: %s: %s",
name,
err.response["Error"]["Code"],
err.response["Error"]["Message"],
)
raise
else:
return response["activityArn"]
# snippet-end:[python.example_code.sfn.CreateActivity]
# snippet-start:[python.example_code.sfn.ListActivities]
def find(self, name):
"""
Find an activity by name. This requires listing activities until one is found
with a matching name.
:param name: The name of the activity to search for.
:return: If found, the ARN of the activity; otherwise, None.
"""
try:
paginator = self.stepfunctions_client.get_paginator("list_activities")
for page in paginator.paginate():
for activity in page.get("activities", []):
if activity["name"] == name:
return activity["activityArn"]
except ClientError as err:
logger.error(
"Couldn't list activities. Here's why: %s: %s",
err.response["Error"]["Code"],
err.response["Error"]["Message"],
)
raise
# snippet-end:[python.example_code.sfn.ListActivities]
# snippet-start:[python.example_code.sfn.GetActivityTask]
def get_task(self, activity_arn):
"""
Gets task data for an activity. When a state machine is waiting for the
specified activity, a response is returned with data from the state machine.
When a state machine is not waiting, this call blocks for 60 seconds.
:param activity_arn: The ARN of the activity to get task data for.
:return: The task data for the activity.
"""
try:
response = self.stepfunctions_client.get_activity_task(
activityArn=activity_arn
)
except ClientError as err:
logger.error(
"Couldn't get a task for activity %s. Here's why: %s: %s",
activity_arn,
err.response["Error"]["Code"],
err.response["Error"]["Message"],
)
raise
else:
return response
# snippet-end:[python.example_code.sfn.GetActivityTask]
# snippet-start:[python.example_code.sfn.SendTaskSuccess]
def send_task_success(self, task_token, task_response):
"""
Sends a success response to a waiting activity step. A state machine with an
activity step waits for the activity to get task data and then respond with
either success or failure before it resumes processing.
:param task_token: The token associated with the task. This is included in the
response to the get_activity_task action and must be sent
without modification.
:param task_response: The response data from the activity. This data is
received and processed by the state machine.
"""
try:
self.stepfunctions_client.send_task_success(
taskToken=task_token, output=task_response
)
except ClientError as err:
logger.error(
"Couldn't send task success. Here's why: %s: %s",
err.response["Error"]["Code"],
err.response["Error"]["Message"],
)
raise
# snippet-end:[python.example_code.sfn.SendTaskSuccess]
# snippet-start:[python.example_code.sfn.DeleteActivity]
def delete(self, activity_arn):
"""
Delete an activity.
:param activity_arn: The ARN of the activity to delete.
"""
try:
response = self.stepfunctions_client.delete_activity(
activityArn=activity_arn
)
except ClientError as err:
logger.error(
"Couldn't delete activity %s. Here's why: %s: %s",
activity_arn,
err.response["Error"]["Code"],
err.response["Error"]["Message"],
)
raise
else:
return response
# snippet-end:[python.example_code.sfn.DeleteActivity]
# snippet-end:[python.example_code.sfn.Activity_full]