# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 """ This Boto3 example performs these tasks: * * 1. Prompts the user for a unique cluster ID or use the default value. * 2. Creates a Redshift cluster with the specified or default cluster id value. * 3. Waits until the Redshift cluster is available for use. * 4. Lists all databases using a pagination API call. * 5. Creates a table named "Movies" with fields ID, title, and year. * 6. Inserts a specified number of records into the "Movies" table by reading the Movies JSON file. * 7. Prompts the user for a movie release year. * 8. Runs a SQL query to retrieve movies released in the specified year. * 9. Modifies the Redshift cluster. * 10. Prompts the user for confirmation to delete the Redshift cluster. * 11. If confirmed, deletes the specified Redshift cluster. """ import json import os import sys import time import logging import boto3 from redshift import RedshiftWrapper from redshift_data import RedshiftDataWrapper # Add relative path to include demo_tools in this code example without need for setup. sys.path.append("../..") import demo_tools.question as q DASHES = "-" * 80 # snippet-start:[python.example_code.redshift.redshift_scenario.RedshiftScenario] class RedshiftScenario: """Runs an interactive scenario that shows how to get started with Redshift.""" def __init__(self, redshift_wrapper, redshift_data_wrapper): self.redshift_wrapper = redshift_wrapper self.redshift_data_wrapper = redshift_data_wrapper def redhift_scenario(self, json_file_path): database_name = "dev" print(DASHES) print("Welcome to the Amazon Redshift SDK Getting Started example.") print( """ This Python program demonstrates how to interact with Amazon Redshift using the AWS SDK for Python (Boto3). Amazon Redshift is a fully managed, petabyte-scale data warehouse service hosted in the cloud. The program's primary functionalities include cluster creation, verification of cluster readiness, listing databases, table creation, populating data within the table, and executing SQL statements. It also demonstrates querying data from the Movies table. Upon completion, all AWS resources are cleaned up. """ ) if not os.path.isfile(json_file_path): logging.error(f"The file {json_file_path} does not exist.") return print("Let's get started...") user_name = q.ask("Please enter your user name (default is awsuser):") user_name = user_name if user_name else "awsuser" print(DASHES) user_password = q.ask( "Please enter your user password (default is AwsUser1000):" ) user_password = user_password if user_password else "AwsUser1000" print(DASHES) print( """A Redshift cluster refers to the collection of computing resources and storage that work together to process and analyze large volumes of data.""" ) cluster_id = q.ask( "Enter a cluster identifier value (default is redshift-cluster-movies): " ) cluster_id = cluster_id if cluster_id else "redshift-cluster-movies" self.redshift_wrapper.create_cluster( cluster_id, "ra3.4xlarge", user_name, user_password, True, 2 ) print(DASHES) print(f"Wait until {cluster_id} is available. This may take a few minutes...") q.ask("Press Enter to continue...") self.wait_cluster_available(cluster_id) print(DASHES) print( f""" When you created {cluster_id}, the dev database is created by default and used in this scenario. To create a custom database, you need to have a CREATEDB privilege. For more information, see the documentation here: https://docs.aws.amazon.com/redshift/latest/dg/r_CREATE_DATABASE.html. """ ) q.ask("Press Enter to continue...") print(DASHES) print(DASHES) print(f"List databases in {cluster_id}") q.ask("Press Enter to continue...") databases = self.redshift_data_wrapper.list_databases( cluster_id, database_name, user_name ) print(f"The cluster contains {len(databases)} database(s).") for database in databases: print(f" Database: {database}") print(DASHES) print(DASHES) print("Now you will create a table named Movies.") q.ask("Press Enter to continue...") self.create_table(cluster_id, database_name, user_name) print(DASHES) print("Populate the Movies table using the Movies.json file.") print( "Specify the number of records you would like to add to the Movies Table." ) print("Please enter a value between 50 and 200.") while True: try: num_records = int(q.ask("Enter a value: ", q.is_int)) if 50 <= num_records <= 200: break else: print("Invalid input. Please enter a value between 50 and 200.") except ValueError: print("Invalid input. Please enter a value between 50 and 200.") self.populate_table( cluster_id, database_name, user_name, json_file_path, num_records ) print(DASHES) print("Query the Movies table by year. Enter a value between 2012-2014.") while True: movie_year = int(q.ask("Enter a year: ", q.is_int)) if 2012 <= movie_year <= 2014: break else: print("Invalid input. Please enter a valid year between 2012 and 2014.") # Function to query database sql_id = self.query_movies_by_year( database_name, user_name, movie_year, cluster_id ) print(f"The identifier of the statement is {sql_id}") print("Checking statement status...") self.wait_statement_finished(sql_id) result = self.redshift_data_wrapper.get_statement_result(sql_id) self.display_movies(result) print(DASHES) print(DASHES) print("Now you will modify the Redshift cluster.") q.ask("Press Enter to continue...") preferred_maintenance_window = "wed:07:30-wed:08:00" self.redshift_wrapper.modify_cluster(cluster_id, preferred_maintenance_window) print(DASHES) print(DASHES) delete = q.ask("Do you want to delete the cluster? (y/n) ", q.is_yesno) if delete: print(f"You selected to delete {cluster_id}") q.ask("Press Enter to continue...") self.redshift_wrapper.delete_cluster(cluster_id) else: print(f"Cluster {cluster_id}cluster_id was not deleted") print(DASHES) print("This concludes the Amazon Redshift SDK Getting Started scenario.") print(DASHES) # snippet-start:[python.example_code.redshift.redshift_scenario.create_table] def create_table(self, cluster_id, database, username): self.redshift_data_wrapper.execute_statement( cluster_identifier=cluster_id, database_name=database, user_name=username, sql="CREATE TABLE Movies (statement_id INT PRIMARY KEY, title VARCHAR(100), year INT)", ) print("Table created: Movies") # snippet-end:[python.example_code.redshift.redshift_scenario.create_table] def populate_table(self, cluster_id, database, username, file_name, number): with open(file_name) as f: data = json.load(f) i = 0 for record in data: if i == number: break statement_id = i title = record["title"] year = record["year"] i = i + 1 parameters = [ {"name": "statement_id", "value": str(statement_id)}, {"name": "title", "value": title}, {"name": "year", "value": str(year)}, ] self.redshift_data_wrapper.execute_statement( cluster_identifier=cluster_id, database_name=database, user_name=username, sql="INSERT INTO Movies VALUES(:statement_id, :title, :year)", parameter_list=parameters, ) print(f"{i} records inserted into Movies table") def wait_cluster_available(self, cluster_id): """ Waits for a cluster to be available. :param cluster_id: The cluster identifier. Note: The cluster_available waiter can also be used. It is not used in this case to allow an elapsed time message. """ cluster_ready = False start_time = time.time() while not cluster_ready: time.sleep(30) cluster = self.redshift_wrapper.describe_clusters(cluster_id) status = cluster[0]["ClusterStatus"] if status == "available": cluster_ready = True elif status != "creating": raise Exception( f"Cluster {cluster_id} creation failed with status {status}." ) elapsed_seconds = int(round(time.time() - start_time)) minutes = int(elapsed_seconds // 60) seconds = int(elapsed_seconds % 60) print(f"Elapsed Time: {minutes}:{seconds:02d} - status {status}...") if minutes > 30: raise Exception( f"Cluster {cluster_id} is not available after 30 minutes." ) def query_movies_by_year(self, database, username, year, cluster_id): sql = "SELECT * FROM Movies WHERE year = :year" params = [{"name": "year", "value": str(year)}] response = self.redshift_data_wrapper.execute_statement( cluster_identifier=cluster_id, database_name=database, user_name=username, sql=sql, parameter_list=params, ) return response["Id"] @staticmethod def display_movies(response): metadata = response["ColumnMetadata"] records = response["Records"] title_column_index = None for i in range(len(metadata)): if metadata[i]["name"] == "title": title_column_index = i break if title_column_index is None: print("No title column found.") return print(f"Found {len(records)} movie(s).") for record in records: print(f" {record[title_column_index]['stringValue']}") def wait_statement_finished(self, sql_id): while True: time.sleep(1) response = self.redshift_data_wrapper.describe_statement(sql_id) status = response["Status"] print(f"Statement status is {status}.") if status == "FAILED": print(f"The query failed because {response['Error']}. Ending program") raise Exception("The Query Failed. Ending program") elif status == "FINISHED": break # snippet-end:[python.example_code.redshift.redshift_scenario.RedshiftScenario] # snippet-start:[python.example_code.redshift.redshift_scenario.RedshiftScenario.main] def main(): redshift_client = boto3.client("redshift") redshift_data_client = boto3.client("redshift-data") redshift_wrapper = RedshiftWrapper(redshift_client) redshift_data_wrapper = RedshiftDataWrapper(redshift_data_client) redshift_scenario = RedshiftScenario(redshift_wrapper, redshift_data_wrapper) redshift_scenario.redhift_scenario( f"{os.path.dirname(__file__)}/../../../resources/sample_files/movies.json" ) # snippet-end:[python.example_code.redshift.redshift_scenario.RedshiftScenario.main] if __name__ == "__main__": main()