Follow along with the README.md file at the root of the repository to set up your environment.
After setting up your environment, navigate to the directory where all the code for this tutorial is located.
cd dev-day-zoom-out/track_1_build_workflows/session_2_resilent_workflows/1_starting_flow
You’re more than welcome to work off the existing example, but if you’d like to start fresh, follow the tutorial end-to-end to build your very own pipeline!
Before building your pipeline, you’ll need to set up the necessary resources in AWS, MotherDuck, and Prefect Cloud.
If you’re a seasoned AWS user and already have an IAM User and S3 bucket ready to go, you can skip to the Create a Prefect AWS Credentials block section.
The following steps will help you set up the AWS resources and Prefect blocks for your workflow.
Sign in to the AWS Console to create an AWS IAM User and an AWS S3 bucket.
Sign in to Prefect Cloud to create a Prefect AWS credentials block and an S3 bucket block.
The Access Key ID and Secret Access Key from the IAM User can be stored in a Prefect AWS Credentials block.
This Prefect block allows your flows and tasks to connect to your AWS resources.
You can create this block by using the Python SDK, or through the Prefect Cloud UI.
Create a new python file in your project directory and add the following code:
from prefect_aws import AwsCredentialsAwsCredentials( aws_access_key_id="PLACEHOLDER", # Replace this with your access key id. aws_secret_access_key="PLACEHOLDER", # Replace this with your secret access key. region_name="us-east-2" # Replace this with your region.).save("BLOCK-NAME-PLACEHOLDER") # Replace this with a descriptive block name.
Make sure you have prefect-aws installed in your environment before running the script.
Create a new python file in your project directory and add the following code:
from prefect_aws import AwsCredentialsAwsCredentials( aws_access_key_id="PLACEHOLDER", # Replace this with your access key id. aws_secret_access_key="PLACEHOLDER", # Replace this with your secret access key. region_name="us-east-2" # Replace this with your region.).save("BLOCK-NAME-PLACEHOLDER") # Replace this with a descriptive block name.
Make sure you have prefect-aws installed in your environment before running the script.
S3 bucket blocks are used to store your S3 bucket configuration.
Your flows and tasks use this block to connect to your S3 bucket.
Similarly to the AWS credentials block, you can create this block by using the Python SDK or Prefect Cloud UI.
Create a new python file in your project directory and add the following code:
from prefect_aws import S3Bucket, AwsCredentialsaws_credentials = AwsCredentials.load("NAME-OF-YOUR-AWS-CREDENTIALS-BLOCK") #Replace this with your AWS credentials block name.S3Bucket( bucket_name="YOUR-S3-BUCKET-NAME", #Replace this with your S3 bucket name. credentials=aws_credentials #Replace this with your AWS credentials block name.).save("BLOCK-NAME-PLACEHOLDER") #Replace this with a descriptive block name.
Important: Make sure you have prefect-aws installed in your environment before running the script.
Create a new python file in your project directory and add the following code:
from prefect_aws import S3Bucket, AwsCredentialsaws_credentials = AwsCredentials.load("NAME-OF-YOUR-AWS-CREDENTIALS-BLOCK") #Replace this with your AWS credentials block name.S3Bucket( bucket_name="YOUR-S3-BUCKET-NAME", #Replace this with your S3 bucket name. credentials=aws_credentials #Replace this with your AWS credentials block name.).save("BLOCK-NAME-PLACEHOLDER") #Replace this with a descriptive block name.
Important: Make sure you have prefect-aws installed in your environment before running the script.
Prefect secret blocks are used to store sensitive information, such as your MotherDuck access token.
This secret block will allow your flows and tasks to connect to your MotherDuck database.
You can create a secret block by using the Python SDK or Prefect Cloud UI.
Create a new python file in your project directory and add the following code:
from prefect_secrets import SecretSecret( value="YOUR-MOTHERDUCK-TOKEN", #Replace this with your motherduck access token. name="motherduck-access-token" #Replace this with a name that will help you identify the secret.).save("BLOCK-NAME-PLACEHOLDER") #Replace this with a descriptive block name.
Create a new python file in your project directory and add the following code:
from prefect_secrets import SecretSecret( value="YOUR-MOTHERDUCK-TOKEN", #Replace this with your motherduck access token. name="motherduck-access-token" #Replace this with a name that will help you identify the secret.).save("BLOCK-NAME-PLACEHOLDER") #Replace this with a descriptive block name.
Create a new directory on your computer to store your project files.
Within the root of the directory, create three subfolders for storing the pre- and post-processed game data, and a python file for your Prefect flow:
First, import the necessary packages, and then create three tasks that fetch game data by using the statsapi package.
You’ll also create a flow that calls these tasks and defines the dependencies between them.
from prefect import flow, task, runtimefrom prefect.artifacts import create_markdown_artifactfrom prefect_aws.s3 import S3Bucketfrom prefect.blocks.system import Secretfrom datetime import datetimeimport statsapiimport jsonimport pandas as pdimport duckdb@taskdef get_recent_games(team_name, start_date, end_date): '''This task will fetch the schedule for the provided team and date range and return the game ids.''' team = statsapi.lookup_team(team_name) schedule = statsapi.schedule(team=team[0]["id"], start_date=start_date, end_date=end_date) for game in schedule: print(game['game_id']) return [game['game_id'] for game in schedule]@taskdef fetch_single_game_boxscore(game_id, start_date, end_date, team_name): '''This task will fetch the boxscore for a single game and return the game data.''' boxscore = statsapi.boxscore_data(game_id) # Extract the relevant data. home_score = boxscore['home']['teamStats']['batting']['runs'] away_score = boxscore['away']['teamStats']['batting']['runs'] home_team = boxscore['teamInfo']['home']['teamName'] away_team = boxscore['teamInfo']['away']['teamName'] time_value = next(item['value'] for item in boxscore['gameBoxInfo'] if item['label'] == 'T') #Create a dictionary with the game data. game_data = { 'search_start_date': start_date, 'search_end_date': end_date, 'chosen_team_name': team_name, 'game_id': game_id, 'home_team': home_team, 'away_team': away_team, 'home_score': home_score, 'away_score': away_score, 'score_differential': abs(home_score - away_score), 'game_time': time_value, } print(game_data) return game_data@flowdef mlb_flow(team_name, start_date, end_date): # Get recent games. game_ids = get_recent_games(team_name, start_date, end_date) # Fetch boxscore for each game. game_data = [fetch_single_game_boxscore(game_id, start_date, end_date, team_name) for game_id in game_ids] #Define file path for raw data. today = datetime.now().strftime("%Y-%m-%d") # This uses the current date in the format YYYY-MM-DD. flow_run_name = runtime.flow_run.name raw_file_path = f"./raw_data/{today}-{team_name}-{flow_run_name}-boxscore.json" # Save raw data to a local folder. save_raw_data_to_file(game_data, raw_file_path)if __name__ == "__main__": mlb_flow("marlins", '06/01/2024', '06/30/2024')
If you run this flow, you’ll see the raw data saved to the raw_data folder in your project directory.
For the next two tasks, you’ll upload the raw data file to s3, and pull it back down for further processing.
You’ll need to add these new tasks to your flow function as well.
Copy and paste the following code into your mlb_flow.py file.
@taskdef upload_raw_data_to_s3(file_path): '''This task will upload the raw data to s3.''' s3_bucket = S3Bucket.load("mlb-raw-data") # Replace this with your S3 bucket block name. s3_bucket_path = s3_bucket.upload_from_path(file_path) print(s3_bucket_path) return s3_bucket_path@taskdef download_raw_data_from_s3(s3_file_path): '''Download the raw data from s3.''' s3_bucket = S3Bucket.load("mlb-raw-data") # Replace this with your S3 bucket block name. local_file_path = f"./boxscore_analysis/{s3_file_path}" s3_bucket.download_object_to_path(s3_file_path, local_file_path) return local_file_path
Update the mlb_flow function to include the new tasks.
@flowdef mlb_flow(team_name, start_date, end_date): # Get recent games. game_ids = get_recent_games(team_name, start_date, end_date) # Fetch boxscore for each game. game_data = [fetch_single_game_boxscore(game_id, start_date, end_date, team_name) for game_id in game_ids] # Define file path for raw data. today = datetime.now().strftime("%Y-%m-%d") # This uses the current date in the format YYYY-MM-DD. flow_run_name = runtime.flow_run.name raw_file_path = f"./raw_data/{today}-{team_name}-{flow_run_name}-boxscore.json" # Save raw data to a local folder. save_raw_data_to_file(game_data, raw_file_path) # Upload raw data to s3. s3_file_path = upload_raw_data_to_s3(raw_file_path) # Download raw data from s3. raw_data = download_raw_data_from_s3(s3_file_path)if __name__ == "__main__": mlb_flow("marlins", '06/01/2024', '06/30/2024')
Once you have your flow function updated, you can run the the script to see the data file arrive in your S3 bucket.
Create a set of functions that clean up the raw data, run some basic statistics, and save the results to a MotherDuck database.
Copy and paste the following code into your mlb_flow.py file.
@taskdef clean_time_value(data_file_path): '''This task will clean the time value.''' try: with open(data_file_path, 'r') as f: game_data_list = json.load(f) except FileNotFoundError: raise ValueError(f"File not found: {data_file_path}") except json.JSONDecodeError: raise ValueError(f"Invalid JSON file: {data_file_path}") # Process each game in the list. for game_data in game_data_list: # Remove any extra text like '(1:16 delay)' if '(' in game_data['game_time']: game_data['game_time'] = game_data['game_time'].split('(')[0] # Remove any non-digit, non-colon characters. game_data['game_time'] = ''.join(char for char in game_data['game_time'] if char.isdigit() or char == ':') hours, minutes = map(int, game_data['game_time'].split(':')) game_data['game_time_in_minutes'] = hours * 60 + minutes # Save the modified data back to the file with open(data_file_path, 'w') as f: json.dump(game_data_list, f, indent=4, sort_keys=True) return data_file_path@taskdef analyze_games(data_file_path): '''This task will analyze the game data and return the analysis.''' try: with open(data_file_path, 'r') as f: game_data = json.load(f) except FileNotFoundError: raise ValueError(f"File not found: {data_file_path}") except json.JSONDecodeError: raise ValueError(f"Invalid JSON file: {data_file_path}") # Convert to a DataFrame. df = pd.DataFrame(game_data) # Get the search parameters. start_date = df['search_start_date'].unique()[0] end_date = df['search_end_date'].unique()[0] team_name = df['chosen_team_name'].unique()[0] # Calculate average, median, max, and min differential avg_differential = float(df['score_differential'].mean()) median_differential = float(df['score_differential'].median()) max_differential = float(df['score_differential'].max()) min_differential = float(df['score_differential'].min()) # Calculate average, median, max, and min game time. avg_game_time = float(df['game_time_in_minutes'].mean()) median_game_time = float(df['game_time_in_minutes'].median()) max_game_time = float(df['game_time_in_minutes'].max()) min_game_time = float(df['game_time_in_minutes'].min()) # Calculate correlation between game time and score differential. correlation = float(df['game_time_in_minutes'].corr(df['score_differential'])) game_analysis = { 'search_start_date': start_date, 'search_end_date': end_date, 'chosen_team_name': team_name, 'max_game_time': max_game_time, 'min_game_time': min_game_time, 'median_game_time': median_game_time, 'average_game_time': avg_game_time, 'max_differential': max_differential, 'min_differential': min_differential, 'median_differential': median_differential, 'average_differential': avg_differential, 'time_differential_correlation': correlation, } print(game_analysis) return game_analysis@taskdef save_analysis_to_file(game_analysis, file_name): '''This task will save the analysis to a file.''' df = pd.DataFrame([game_analysis]) df.to_parquet(file_name) print(file_name) return file_name@taskdef load_parquet_to_duckdb(parquet_file_path, team_name): '''This task will load the parquet file to duckdb.''' #Connect to duckdb. secret_block = Secret.load("mother-duck-token") # Replace this with your secret block name. # Access the stored secret. duck_token = secret_block.get() duckdb_conn = duckdb.connect(f"md:?motherduck_token={duck_token}") # Create a table in duckdb. duckdb_conn.execute(f"""CREATE TABLE IF NOT EXISTS boxscore_analysis_{team_name} ( search_start_date TEXT, search_end_date TEXT, chosen_team_name TEXT, max_game_time FLOAT, min_game_time FLOAT, median_game_time FLOAT, average_game_time FLOAT, max_differential FLOAT, min_differential FLOAT, median_differential FLOAT, average_differential FLOAT, time_differential_correlation FLOAT)""") # Use the COPY command to load the Parquet file into MotherDuck. duckdb_conn.execute(f"COPY boxscore_analysis FROM '{parquet_file_path}' (FORMAT 'parquet')")
Update the mlb_flow function to include the new tasks.
# Update the mlb_flow function to include the new tasks. @flowdef mlb_flow(team_name, start_date, end_date): # Get recent games. game_ids = get_recent_games(team_name, start_date, end_date) # Fetch boxscore for each game. game_data = [fetch_single_game_boxscore(game_id, start_date, end_date, team_name) for game_id in game_ids] #Define file path for raw data. today = datetime.now().strftime("%Y-%m-%d") # This uses the current date in the format YYYY-MM-DD. flow_run_name = runtime.flow_run.name raw_file_path = f"./raw_data/{today}-{team_name}-{flow_run_name}-boxscore.json" # Save raw data to a local folder. save_raw_data_to_file(game_data, raw_file_path) # Upload raw data to s3. s3_file_path = upload_raw_data_to_s3(raw_file_path) # Download raw data from s3. raw_data = download_raw_data_from_s3(s3_file_path) # Clean the time value. clean_data = clean_time_value(raw_data) # Analyze the results. results = analyze_games(clean_data) # Save the results to a file. parquet_file_path = f"./boxscore_parquet/{today}-{team_name}-{flow_run_name}-game-analysis.parquet" save_analysis_to_file(results, parquet_file_path) # Load the results to duckdb load_parquet_to_duckdb(parquet_file_path, team_name)if __name__ == "__main__": mlb_flow("marlins", '06/01/2024', '06/30/2024')
Once you have your flow function updated, you can run the the script to see the processed data arrive in your MotherDuck database.
As a last step, add some visibility into the results of your flow.
You can add a task that creates a markdown artifact to view the results in the Prefect Cloud UI.
Copy and paste the following code into your mlb_flow.py file.
@taskdef game_analysis_artifact(game_analysis, game_data_path): '''This task will create an artifact with the game analysis.''' # First read the JSON data from the file. with open(game_data_path, 'r') as f: game_data = json.load(f) # Now create the DataFrame from the loaded data. df = pd.DataFrame(game_data) # Create the markdown report. markdown_report=f""" # Game Analysis Report## Search ParametersSearch Start Date: {game_analysis['search_start_date']}Search End Date: {game_analysis['search_end_date']}Chosen Team Name: {game_analysis['chosen_team_name']}## Summary StatisticsMax game time: {game_analysis['max_game_time']:.2f}Min game time: {game_analysis['min_game_time']:.2f}Median game time: {game_analysis['median_game_time']:.2f}Average game time: {game_analysis['average_game_time']:.2f}Max differential: {game_analysis['max_differential']:.2f}Min differential: {game_analysis['min_differential']:.2f}Median differential: {game_analysis['median_differential']:.2f}Average differential: {game_analysis['average_differential']:.2f}Correlation between game time and score differential: {game_analysis['time_differential_correlation']:.2f}## Raw Data{df.to_markdown(index=False)}""" create_markdown_artifact( key="game-analysis", markdown=markdown_report, description="Game analysis report" )
Update the mlb_flow function to include the new tasks.
@flowdef mlb_flow(team_name, start_date, end_date): # Get recent games. game_ids = get_recent_games(team_name, start_date, end_date) # Fetch boxscore for each game. game_data = [fetch_single_game_boxscore(game_id, start_date, end_date, team_name) for game_id in game_ids] #Define file path for raw data. today = datetime.now().strftime("%Y-%m-%d") # This uses the current date in the format YYYY-MM-DD. flow_run_name = runtime.flow_run.name raw_file_path = f"./raw_data/{today}-{team_name}-{flow_run_name}-boxscore.json" # Save raw data to a local folder. save_raw_data_to_file(game_data, raw_file_path) # Upload raw data to s3. s3_file_path = upload_raw_data_to_s3(raw_file_path) #Download raw data from s3. raw_data = download_raw_data_from_s3(s3_file_path) # Clean the time value. clean_data = clean_time_value(raw_data) # Analyze the results. results = analyze_games(clean_data) # Save the results to a file. parquet_file_path = f"./boxscore_parquet/{today}-{team_name}-{flow_run_name}-game-analysis.parquet" save_analysis_to_file(results, parquet_file_path) # Load the results to duckdb. load_parquet_to_duckdb(parquet_file_path, team_name) # Save the results to an artifact. game_analysis_artifact(results, raw_data)if __name__ == "__main__": mlb_flow("marlins", '06/01/2024', '06/30/2024')
After adding the new code, run the updated flow to see the markdown artifact!
Now that you have a complete ETL pipeline, you can outfit it with some failure handling and data quality checks to make it more resilient.
You might even consider deploying it to serverless infrastructure, so that you don’t have to keep your laptop running 24/7!
Jump to the next tutorial to learn how you can use Prefect to do just that: Deploy resilient pipelines to serverless infrastructure