2019-02-04 18:08:27 -08:00
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
2019-04-10 16:42:42 -07:00
# snippet-comment:[These are tags for the AWS doc team's sample catalog. Do not remove.]
# snippet-sourcedescription:[SqsQueueNotificationWorker.py demonstrates how to create a separate process to handle notification messages for an Elastic Transcoder job.]
# snippet-service:[elastictranscoder]
# snippet-keyword:[Amazon Elastic Transcoder]
# snippet-keyword:[Python]
2019-08-27 12:07:08 -07:00
# snippet-sourcesyntax:[python]
2019-09-05 20:06:43 -07:00
# snippet-sourcesyntax:[python]
2019-04-10 16:42:42 -07:00
# snippet-sourcetype:[snippet]
# snippet-sourcedate:[2019-02-04]
# snippet-sourceauthor:[AWS]
2019-02-04 18:08:27 -08:00
# Copyright 2010-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# This file is licensed under the Apache License, Version 2.0 (the "License").
# You may not use this file except in compliance with the License. A copy of the
# License is located at
#
# http://aws.amazon.com/apache2.0/
#
# This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
# OF ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
# snippet-start:[elastictranscoder.python.create_sqs_notification_queue.import]
from ctypes import c_bool
from enum import Enum , auto
import json
import multiprocessing
import pprint
import boto3
class JobStatus ( Enum ) :
""" Status of an Elastic Transcoder job """
2023-10-18 10:35:05 -07:00
SUCCESS = auto ( ) # Elastic Transcoder job finished successfully
ERROR = auto ( ) # Elastic Transcoder job failed
RUNNING = auto ( ) # Job is running
UNKNOWN = auto ( ) # SqsWorker process was aborted
2019-02-04 18:08:27 -08:00
class ProcessStatus ( Enum ) :
""" Status of an SqsWorker process """
2023-10-18 10:35:05 -07:00
READY = auto ( ) # Initialized, but not yet started
IN_PROGRESS = auto ( ) # Started and monitoring notifications
ABORTED = auto ( ) # Aborted before Elastic Transcoder job finished
FINISHED = auto ( ) # Finished after handling all job notifications
2019-02-04 18:08:27 -08:00
class SqsWorker :
""" Monitors SQS notifications for an Elastic Transcoder job
Each Elastic Transcoder job/JobMonitor must have its own SqsWorker
object. The SqsWorker handles messages for the job. Messages for other
jobs are ignored.
The SysWorker performs its task in a separate process. The JobMonitor
starts the process by calling SysWorker.start().
While the SysWorker process is handling job notifications, the JobMonitor
parent process can perform other tasks, including starting new jobs with
new JobMonitor and SqsWorker objects.
When the Transcoder job is finished, a SysWorker flag is set. The
JobMonitor parent process must periodically retrieve the current setting
of the flag by calling SysWorker.finished().
2019-04-10 16:42:42 -07:00
When the Transcoder job has finished, the JobMonitor parent process must
2019-02-04 18:08:27 -08:00
terminate the SysWorker process by calling SysWorker.stop().
The final result of the completed job can be retrieved by calling
SysWorker.job_status().
"""
2023-10-18 10:35:05 -07:00
2019-02-04 18:08:27 -08:00
def __init__ ( self , job_id , sqs_queue_name ) :
""" Initialize an SqsWorker object to process SQS notification
messages for a particular Elastic Transcoder job.
:param job_id: string; Elastic Transcoder job ID to monitor
:param sqs_queue_name: string; Name of SQS queue subscribed to receive
notifications for job_id
"""
self . _job_id = job_id
self . _finished = multiprocessing . Value ( c_bool , False )
2023-10-18 10:35:05 -07:00
self . _job_status = multiprocessing . Value ( " i " , JobStatus . RUNNING . value )
self . _process_status = multiprocessing . Value ( " i " , ProcessStatus . READY . value )
self . _args = (
job_id ,
sqs_queue_name ,
self . _finished ,
self . _job_status ,
self . _process_status ,
)
2019-02-04 18:08:27 -08:00
self . _process = None
def start ( self ) :
""" Start a new SqsWorker process to handle the job ' s notifications """
if self . _process is not None :
2023-10-18 10:35:05 -07:00
raise RuntimeError ( " SqsQueueNotificationWorker already running. " )
self . _process = multiprocessing . Process (
target = poll_and_handle_messages , args = self . _args
)
2019-02-04 18:08:27 -08:00
self . _process . start ( )
self . _process_status . value = ProcessStatus . IN_PROGRESS . value
def stop ( self ) :
""" Stop the SqsWorker process """
if self . _process is None :
2023-10-18 10:35:05 -07:00
raise RuntimeError ( " SqsQueueNotificationWorker already stopped. " )
2019-02-04 18:08:27 -08:00
if self . _process . is_alive ( ) :
# Aborting the process before the job is finished
self . _process_status . value = ProcessStatus . ABORTED . value
self . _job_status . value = JobStatus . UNKNOWN . value
self . _finished . value = True
self . _process . join ( )
def finished ( self ) :
""" Finished = Job completed successfully or job terminated with error
or monitoring of notifications was aborted before receiving a
job-completed notification
"""
return self . _finished . value
def job_status ( self ) :
return JobStatus ( self . _job_status . value )
def process_status ( self ) :
return ProcessStatus ( self . _process_status . value )
def __repr__ ( self ) :
2023-10-18 10:35:05 -07:00
return (
f " SqsWorker(Job ID: { self . _job_id } , "
f " Status: { ProcessStatus ( self . _process_status . value ) . name } ) "
)
2019-02-04 18:08:27 -08:00
2023-10-18 10:35:05 -07:00
def poll_and_handle_messages (
job_id , sqs_queue_name , finished , job_status , process_status
) :
2019-02-04 18:08:27 -08:00
""" Process SQS notifications for a particular Elastic Transcoder job
This method runs as a separate process.
:param job_id: string; Elastic Transcoder job ID to monitor
:param sqs_queue_name: string; Name of SQS queue
:param finished: boolean; Shared memory flag. While this method is running,
the flag might be set externally if the JobMonitor parent process instructs
us to stop before we receive notification that the job has finished.
Otherwise, this method sets the finished flag when the Transcoder job
finishes.
:param job_status: int/JobStatus enum; Shared memory variable containing
the Transcoder job status
:param process_status: int/ProcessStatus enum; Shared memory variable
containing the SysWorker process status
"""
2023-10-18 10:35:05 -07:00
sqs_client = boto3 . client ( " sqs " )
2019-02-04 18:08:27 -08:00
response = sqs_client . get_queue_url ( QueueName = sqs_queue_name )
2023-10-18 10:35:05 -07:00
sqs_queue_url = response [ " QueueUrl " ]
2019-02-04 18:08:27 -08:00
# Loop until the job is finished or the JobMonitor parent process instructs
# us to stop
while not finished . value :
2023-10-18 10:35:05 -07:00
response = sqs_client . receive_message (
QueueUrl = sqs_queue_url ,
MaxNumberOfMessages = 5 ,
WaitTimeSeconds = 5 ,
)
2019-02-04 18:08:27 -08:00
# Any messages received?
2023-10-18 10:35:05 -07:00
if " Messages " not in response :
2019-02-04 18:08:27 -08:00
continue
# Process each message
2023-10-18 10:35:05 -07:00
for message in response [ " Messages " ] :
2019-02-04 18:08:27 -08:00
# Extract the message part of the body
2023-10-18 10:35:05 -07:00
notification = json . loads ( json . loads ( message [ " Body " ] ) [ " Message " ] )
2019-02-04 18:08:27 -08:00
# Show the notification information
2023-10-18 10:35:05 -07:00
print ( " Notification: " )
2019-02-04 18:08:27 -08:00
pprint . pprint ( notification )
# Is the message for this job?
2023-10-18 10:35:05 -07:00
if notification [ " jobId " ] == job_id :
2019-02-04 18:08:27 -08:00
# Delete the message from the queue
2023-10-18 10:35:05 -07:00
sqs_client . delete_message (
QueueUrl = sqs_queue_url , ReceiptHandle = message [ " ReceiptHandle " ]
)
2019-02-04 18:08:27 -08:00
# Did the job finish, either successfully or with error?
2023-10-18 10:35:05 -07:00
if notification [ " state " ] == " COMPLETED " :
2019-02-04 18:08:27 -08:00
# Set shared memory flags
job_status . value = JobStatus . SUCCESS . value
process_status . value = ProcessStatus . FINISHED . value
finished . value = True
2023-10-18 10:35:05 -07:00
elif notification [ " state " ] == " ERROR " :
2019-02-04 18:08:27 -08:00
job_status . value = JobStatus . ERROR . value
process_status . value = ProcessStatus . FINISHED . value
finished . value = True
2023-10-18 10:35:05 -07:00
2019-02-04 18:08:27 -08:00
# snippet-end:[elastictranscoder.python.create_sqs_notification_queue.import]