Learn how to build resilient and performant data pipelines with Prefect.
In the Quickstart, you created a Prefect flow to get stars for a list of GitHub repositories.
And in Schedule a flow, you learned how to schedule runs of that flow on remote infrastructure.
In this tutorial, you’ll learn how to turn this flow into a resilient and performant data pipeline.
The real world is messy, and Prefect is designed to handle that messiness.
Your API requests can fail.
Your API requests run too slowly.
Your API requests run too quickly and you get rate limited.
You waste time and money running the same tasks multiple times.
Instead of solving these problems in the business logic itself, use Prefect’s built-in features to handle them.
The first improvement you can make is to add retries to your flow.
Whenever an HTTP request fails, you can retry it a few times before giving up.
from typing import Anyimport httpxfrom prefect import task@task(retries=3)def fetch_stats(github_repo: str) -> dict[str, Any]: """Task 1: Fetch the statistics for a GitHub repo""" api_response = httpx.get(f"https://api.github.com/repos/{github_repo}") api_response.raise_for_status() # Force a retry if not a 2xx status code return api_response.json()
Run the following code to see retries in action:
from typing import Anyimport httpxfrom prefect import flow, task # Prefect flow and task decorators@task(retries=3)def fetch_stats(github_repo: str) -> dict[str, Any]: """Task 1: Fetch the statistics for a GitHub repo""" api_response = httpx.get(f"https://api.github.com/repos/{github_repo}") api_response.raise_for_status() # Force a retry if not a 2xx status code return api_response.json()@taskdef get_stars(repo_stats: dict[str, Any]) -> int: """Task 2: Get the number of stars from GitHub repo statistics""" return repo_stats['stargazers_count']@flow(log_prints=True)def show_stars(github_repos: list[str]): """Flow: Show the number of stars that GitHub repos have""" for repo in github_repos: # Call Task 1 repo_stats = fetch_stats(repo) # Call Task 2 stars = get_stars(repo_stats) # Print the result print(f"{repo}: {stars} stars")# Run the flowif __name__ == "__main__": show_stars([ "PrefectHQ/prefect", "pydantic/pydantic", "huggingface/transformers" ])
If individual API requests are slow, you can speed them up in aggregate by making multiple requests concurrently.
When you call the map method on a task, you submit a list of arguments to the task runner to run concurrently (alternatively, you could .submit() each argument individually).
from prefect import flow@flow(log_prints=True)def show_stars(github_repos: list[str]) -> None: """Flow: Show number of GitHub repo stars""" # Task 1: Make HTTP requests concurrently stats_futures = fetch_stats.map(github_repos) # Task 2: Once each concurrent task completes, get the star counts stars = get_stars.map(stats_futures).result() # Show the results for repo, star_count in zip(github_repos, stars): print(f"{repo}: {star_count} stars")
Run the following code to see concurrent tasks in action:
from typing import Anyimport httpxfrom prefect import flow, task@task(retries=3)def fetch_stats(github_repo: str) -> dict[str, Any]: """Task 1: Fetch the statistics for a GitHub repo""" return httpx.get(f"https://api.github.com/repos/{github_repo}").json()@taskdef get_stars(repo_stats: dict[str, Any]) -> int: """Task 2: Get the number of stars from GitHub repo statistics""" return repo_stats["stargazers_count"]@flow(log_prints=True)def show_stars(github_repos: list[str]) -> None: """Flow: Show number of GitHub repo stars""" # Task 1: Make HTTP requests concurrently stats_futures = fetch_stats.map(github_repos) # Task 2: Once each concurrent task completes, get the star counts stars = get_stars.map(stats_futures).result() # Show the results for repo, star_count in zip(github_repos, stars): print(f"{repo}: {star_count} stars")if __name__ == "__main__": # Run the flow show_stars( [ "PrefectHQ/prefect", "pydantic/pydantic", "huggingface/transformers" ] )
Calling .result() on the list of futures returned by .map() will block until all tasks are complete.
One consequence of running tasks concurrently is that you’re more likely to hit the rate limits of whatever API you’re using.
To avoid this, use Prefect to set a global concurrency limit.
# GitHub has a rate limit of 60 unauthenticated requests per hour (~0.016 requests per second)prefect gcl create github-api --limit 60 --slot-decay-per-second 0.016
Now, you can use this global concurrency limit in your code to rate limit your API requests.
from typing import Anyfrom prefect import taskfrom prefect.concurrency.sync import rate_limit@taskdef fetch_stats(github_repo: str) -> dict[str, Any]: """Task 1: Fetch the statistics for a GitHub repo""" rate_limit("github-api") return httpx.get(f"https://api.github.com/repos/{github_repo}").json()
Run the following code to see concurrency limits in action:
from typing import Anyimport httpxfrom prefect import flow, taskfrom prefect.concurrency.sync import rate_limit@task(retries=3)def fetch_stats(github_repo: str) -> dict[str, Any]: """Task 1: Fetch the statistics for a GitHub repo""" rate_limit("github-api") return httpx.get(f"https://api.github.com/repos/{github_repo}").json()@taskdef get_stars(repo_stats: dict[str, Any]) -> int: """Task 2: Get the number of stars from GitHub repo statistics""" return repo_stats["stargazers_count"]@flow(log_prints=True)def show_stars(github_repos: list[str]) -> None: """Flow: Show number of GitHub repo stars""" # Task 1: Make HTTP requests concurrently stats_futures = fetch_stats.map(github_repos) # Task 2: Once each concurrent task completes, get the star counts stars = get_stars.map(stats_futures).result() # Show the results for repo, star_count in zip(github_repos, stars): print(f"{repo}: {star_count} stars")# Run the flowif __name__ == "__main__": show_stars([ "PrefectHQ/prefect", "pydantic/pydantic", "huggingface/transformers" ])
For efficiency, you can skip tasks that have already run.
For example, if you don’t want to fetch the number of stars for a given repository more than once per day, you can cache those results for a day.
from typing import Anyfrom datetime import timedeltafrom prefect import taskfrom prefect.cache_policies import INPUTS@task(cache_policy=INPUTS, cache_expiration=timedelta(days=1))def fetch_stats(github_repo: str) -> dict[str, Any]: """Task 1: Fetch the statistics for a GitHub repo""" # ...
Run the following code to see caching in action:
from typing import Anyfrom datetime import timedeltaimport httpxfrom prefect import flow, taskfrom prefect.cache_policies import INPUTSfrom prefect.concurrency.sync import rate_limit@task( retries=3, cache_policy=INPUTS, cache_expiration=timedelta(days=1))def fetch_stats(github_repo: str) -> dict[str, Any]: """Task 1: Fetch the statistics for a GitHub repo""" rate_limit("github-api") return httpx.get(f"https://api.github.com/repos/{github_repo}").json()@taskdef get_stars(repo_stats: dict[str, Any]) -> int: """Task 2: Get the number of stars from GitHub repo statistics""" return repo_stats["stargazers_count"]@flow(log_prints=True)def show_stars(github_repos: list[str]) -> None: """Flow: Show number of GitHub repo stars""" # Task 1: Make HTTP requests concurrently stats_futures = fetch_stats.map(github_repos) # Task 2: Once each concurrent task completes, get the star counts stars = get_stars.map(stats_futures).result() # Show the results for repo, star_count in zip(github_repos, stars): print(f"{repo}: {star_count} stars")# Run the flowif __name__ == "__main__": show_stars([ "PrefectHQ/prefect", "pydantic/pydantic", "huggingface/transformers" ])
This is what your flow looks like after applying all of these improvements:
my_data_pipeline.py
from typing import Anyfrom datetime import timedeltaimport httpxfrom prefect import flow, taskfrom prefect.cache_policies import INPUTSfrom prefect.concurrency.sync import rate_limit@task( retries=3, cache_policy=INPUTS, cache_expiration=timedelta(days=1))def fetch_stats(github_repo: str) -> dict[str, Any]: """Task 1: Fetch the statistics for a GitHub repo""" rate_limit("github-api") return httpx.get(f"https://api.github.com/repos/{github_repo}").json()@taskdef get_stars(repo_stats: dict[str, Any]) -> int: """Task 2: Get the number of stars from GitHub repo statistics""" return repo_stats["stargazers_count"]@flow(log_prints=True)def show_stars(github_repos: list[str]) -> None: """Flow: Show number of GitHub repo stars""" # Task 1: Make HTTP requests concurrently stats_futures = fetch_stats.map(github_repos) # Task 2: Once each concurrent task completes, get the star counts stars = get_stars.map(stats_futures).result() # Show the results for repo, star_count in zip(github_repos, stars): print(f"{repo}: {star_count} stars")# Run the flowif __name__ == "__main__": show_stars([ "PrefectHQ/prefect", "pydantic/pydantic", "huggingface/transformers" ])
Run your flow twice: once to run the tasks and cache the result, again to retrieve the results from the cache.
# Run the tasks and cache the resultspython my_data_pipeline.py# Run again (notice the cached results)python my_data_pipeline.py
The terminal output from the second flow run should look like this:
20:03:04.398 | INFO | prefect.engine - Created flow run 'laughing-nightingale' for flow 'show-stars'20:03:05.146 | INFO | Task run 'fetch_stats-90f' - Finished in state Cached(type=COMPLETED)20:03:05.149 | INFO | Task run 'fetch_stats-258' - Finished in state Cached(type=COMPLETED)20:03:05.153 | INFO | Task run 'fetch_stats-924' - Finished in state Cached(type=COMPLETED)20:03:05.159 | INFO | Task run 'get_stars-3a9' - Finished in state Completed()20:03:05.159 | INFO | Task run 'get_stars-ed3' - Finished in state Completed()20:03:05.161 | INFO | Task run 'get_stars-39c' - Finished in state Completed()20:03:05.162 | INFO | Flow run 'laughing-nightingale' - PrefectHQ/prefect: 17756 stars20:03:05.163 | INFO | Flow run 'laughing-nightingale' - pydantic/pydantic: 21613 stars20:03:05.163 | INFO | Flow run 'laughing-nightingale' - huggingface/transformers: 136166 stars20:03:05.339 | INFO | Flow run 'laughing-nightingale' - Finished in state Completed()