Learn how to build resilient and performant data pipelines with Prefect.
In the Quickstart, you created a Prefect flow to get stars for a list of GitHub repositories.
And in Schedule a flow, you learned how to schedule runs of that flow on remote infrastructure.
In this tutorial, you’ll learn how to turn this flow into a resilient and performant data pipeline.
The real world is messy, and Prefect is designed to handle that messiness.
Your API requests can fail.
Your API requests run too slowly.
Your API requests run too quickly and you get rate limited.
You waste time and money running the same tasks multiple times.
Instead of solving these problems in the business logic itself, use Prefect’s built-in features to handle them.
The first improvement you can make is to add retries to your flow.
Whenever an HTTP request fails, you can retry it a few times before giving up.
from prefect import task@task(retries=3)deffetch_stats(github_repo:str):"""Task 1: Fetch the statistics for a GitHub repo""" api_response = httpx.get(f"https://api.github.com/repos/{github_repo}") api_response.raise_for_status()# Force a retry if you don't get a 2xx status codereturn api_response.json()
Run the following code to see retries in action:
import httpxfrom prefect import flow, task # Prefect flow and task decorators@flow(log_prints=True)defshow_stars(github_repos:list[str]):"""Flow: Show the number of stars that GitHub repos have"""for repo in github_repos:# Call Task 1 repo_stats = fetch_stats(repo)# Call Task 2 stars = get_stars(repo_stats)# Print the resultprint(f"{repo}: {stars} stars")@task(retries=3)deffetch_stats(github_repo:str):"""Task 1: Fetch the statistics for a GitHub repo""" api_response = httpx.get(f"https://api.github.com/repos/{github_repo}") api_response.raise_for_status()# Force a retry if you don't get a 2xx status codereturn api_response.json()@taskdefget_stars(repo_stats:dict):"""Task 2: Get the number of stars from GitHub repo statistics"""return repo_stats['stargazers_count']# Run the flowif __name__ =="__main__": show_stars(["PrefectHQ/prefect","pydantic/pydantic","huggingface/transformers"])
If individual API requests are slow, you can speed them up in aggregate by making multiple requests concurrently.
When you call the submit method on a task, the task is submitted to a task runner for execution.
from prefect import flow@flow(log_prints=True)defshow_stars(github_repos:list[str]):"""Flow: Show the number of stars that GitHub repos have"""# Task 1: Make HTTP requests concurrently repo_stats =[]for repo in github_repos: repo_stats.append({'repo': repo,'task': fetch_stats.submit(repo)# Submit each task to a task runner})# Task 2: Once each concurrent task completes, show the resultsfor repo in repo_stats: repo_name = repo['repo'] stars = get_stars(repo['task'].result())# Block until the task has completedprint(f"{repo_name}: {stars} stars")
Run the following code to see concurrent tasks in action:
import httpxfrom prefect import flow, task # Prefect flow and task decorators@flow(log_prints=True)defshow_stars(github_repos:list[str]):"""Flow: Show the number of stars that GitHub repos have"""# Task 1: Make HTTP requests concurrently repo_stats =[]for repo in github_repos: repo_stats.append({'repo': repo,'task': fetch_stats.submit(repo)# Submit each task to a task runner})# Task 2: Once each concurrent task completes, show the resultsfor repo in repo_stats: repo_name = repo['repo'] stars = get_stars(repo['task'].result())# Block until the task has completedprint(f"{repo_name}: {stars} stars")@taskdeffetch_stats(github_repo:str):"""Task 1: Fetch the statistics for a GitHub repo"""return httpx.get(f"https://api.github.com/repos/{github_repo}").json()@taskdefget_stars(repo_stats:dict):"""Task 2: Get the number of stars from GitHub repo statistics"""return repo_stats['stargazers_count']# Run the flowif __name__ =="__main__": show_stars(["PrefectHQ/prefect","pydantic/pydantic","huggingface/transformers"])
One consequence of running tasks concurrently is that you’re more likely to hit the rate limits of whatever API you’re using.
To avoid this, use Prefect to set a global concurrency limit.
# GitHub has a rate limit of 60 unauthenticated requests per hour (~0.016 requests per second)prefect gcl create github-api --limit60 --slot-decay-per-second 0.016
Now, you can use this global concurrency limit in your code:
from prefect import flowfrom prefect.concurrency.sync import rate_limit@flow(log_prints=True)defshow_stars(github_repos:list[str]):"""Flow: Show the number of stars that GitHub repos have""" repo_stats =[]for repo in github_repos:# Apply the concurrency limit to this loop rate_limit("github-api")# Call Task 1 repo_stats.append({'repo': repo,'task': fetch_stats.submit(repo)})# ...
Run the following code to see concurrency limits in action:
import httpxfrom prefect import flow, task # Prefect flow and task decoratorsfrom prefect.concurrency.sync import rate_limit@flow(log_prints=True)defshow_stars(github_repos:list[str]):"""Flow: Show the number of stars that GitHub repos have""" repo_stats =[]for repo in github_repos:# Apply the concurrency limit to this loop rate_limit("github-api")# Call Task 1 repo_stats.append({'repo': repo,'task': fetch_stats.submit(repo)})# Call Task 2 stars = get_stars(repo_stats)# Print the resultprint(f"{repo}: {stars} stars")@taskdeffetch_stats(github_repo:str):"""Task 1: Fetch the statistics for a GitHub repo"""return httpx.get(f"https://api.github.com/repos/{github_repo}").json()@taskdefget_stars(repo_stats:dict):"""Task 2: Get the number of stars from GitHub repo statistics"""return repo_stats['stargazers_count']# Run the flowif __name__ =="__main__": show_stars(["PrefectHQ/prefect","pydantic/pydantic","huggingface/transformers"])
For efficiency, you can skip tasks that have already run.
For example, if you don’t want to fetch the number of stars for a given repository more than once per day, you can cache those results for a day.
from datetime import timedeltafrom prefect import taskfrom prefect.cache_policies import INPUTS@task(cache_policy=INPUTS, cache_expiration=timedelta(days=1))deffetch_stats(github_repo:str):"""Task 1: Fetch the statistics for a GitHub repo"""# ...
Run the following code to see caching in action:
from datetime import timedeltaimport httpxfrom prefect import flow, task # Prefect flow and task decoratorsfrom prefect.cache_policies import INPUTS@flow(log_prints=True)defshow_stars(github_repos:list[str]):"""Flow: Show the number of stars that GitHub repos have"""for repo in github_repos:# Call Task 1 repo_stats = fetch_stats(repo)# Call Task 2 stars = get_stars(repo_stats)# Print the resultprint(f"{repo}: {stars} stars")@task(cache_policy=INPUTS, cache_expiration=timedelta(days=1))deffetch_stats(github_repo:str):"""Task 1: Fetch the statistics for a GitHub repo"""return httpx.get(f"https://api.github.com/repos/{github_repo}").json()@taskdefget_stars(repo_stats:dict):"""Task 2: Get the number of stars from GitHub repo statistics"""return repo_stats['stargazers_count']# Run the flowif __name__ =="__main__": show_stars(["PrefectHQ/prefect","pydantic/pydantic","huggingface/transformers"])
This is what your flow looks like after applying all of these improvements:
my_data_pipeline.py
from datetime import timedeltaimport httpxfrom prefect import flow, taskfrom prefect.cache_policies import INPUTSfrom prefect.concurrency.sync import rate_limit@flow(log_prints=True)defshow_stars(github_repos:list[str]):"""Flow: Show the number of stars that GitHub repos have"""# Task 1: Make HTTP requests concurrently while respecting concurrency limits repo_stats =[]for repo in github_repos: rate_limit("github-api") repo_stats.append({'repo': repo,'task': fetch_stats.submit(repo)# Submit each task to a task runner})# Task 2: Once each concurrent task completes, show the resultsfor repo in repo_stats: repo_name = repo['repo'] stars = get_stars(repo['task'].result())# Block until the task has completedprint(f"{repo_name}: {stars} stars")@task(retries=3, cache_policy=INPUTS, cache_expiration=timedelta(days=1))deffetch_stats(github_repo:str):"""Task 1: Fetch the statistics for a GitHub repo""" api_response = httpx.get(f"https://api.github.com/repos/{github_repo}") api_response.raise_for_status()# Force a retry if you don't get a 2xx status codereturn api_response.json()@taskdefget_stars(repo_stats:dict):"""Task 2: Get the number of stars from GitHub repo statistics"""return repo_stats['stargazers_count']# Run the flowif __name__ =="__main__": show_stars(["PrefectHQ/prefect","pydantic/pydantic","huggingface/transformers"])
Run your flow twice: once to run the tasks and cache the result, again to retrieve the results from the cache.
# Run the tasks and cache the resultspython my_data_pipeline.py# Retrieve the cached resultspython my_data_pipeline.py
The terminal output from the second flow run should look like this:
09:08:12.265 | INFO | prefect.engine - Created flow run 'laughing-nightingale'for flow 'show-stars'09:08:12.266 | INFO | prefect.engine - View at http://127.0.0.1:4200/runs/flow-run/541864e8-12f7-4890-9397-b2ed361f6b2009:08:12.322 | INFO | Task run 'fetch_stats-0c9' - Finished in state Cached(type=COMPLETED)09:08:12.359 | INFO | Task run 'fetch_stats-e89' - Finished in state Cached(type=COMPLETED)09:08:12.360 | INFO | Task run 'get_stars-b51' - Finished in state Completed()09:08:12.361 | INFO | Flow run 'laughing-nightingale' - PrefectHQ/prefect: 17320 stars09:08:12.372 | INFO | Task run 'fetch_stats-8ef' - Finished in state Cached(type=COMPLETED)09:08:12.374 | INFO | Task run 'get_stars-08d' - Finished in state Completed()09:08:12.374 | INFO | Flow run 'laughing-nightingale' - pydantic/pydantic: 186319 stars09:08:12.387 | INFO | Task run 'get_stars-2af' - Finished in state Completed()09:08:12.387 | INFO | Flow run 'laughing-nightingale' - huggingface/transformers: 134849 stars09:08:12.404 | INFO | Flow run 'laughing-nightingale' - Finished in state Completed()