In the Quickstart, you created a Prefect flow to get stars for a list of GitHub repositories. And in Schedule a flow, you learned how to schedule runs of that flow on remote infrastructure.

In this tutorial, you’ll learn how to turn this flow into a resilient and performant data pipeline. The real world is messy, and Prefect is designed to handle that messiness.

  • Your API requests can fail.
  • Your API requests run too slowly.
  • Your API requests run too quickly and you get rate limited.
  • You waste time and money running the same tasks multiple times.

Instead of solving these problems in the business logic itself, use Prefect’s built-in features to handle them.

Retry on failure

The first improvement you can make is to add retries to your flow. Whenever an HTTP request fails, you can retry it a few times before giving up.

from typing import Any

import httpx
from prefect import task


@task(retries=3)
def fetch_stats(github_repo: str) -> dict[str, Any]:
    """Task 1: Fetch the statistics for a GitHub repo"""

    api_response = httpx.get(f"https://api.github.com/repos/{github_repo}")
    api_response.raise_for_status() # Force a retry if not a 2xx status code
    return api_response.json()

Concurrent execution of slow tasks

If individual API requests are slow, you can speed them up in aggregate by making multiple requests concurrently. When you call the map method on a task, you submit a list of arguments to the task runner to run concurrently (alternatively, you could .submit() each argument individually).

from prefect import flow

@flow(log_prints=True)
def show_stars(github_repos: list[str]) -> None:
    """Flow: Show number of GitHub repo stars"""

    # Task 1: Make HTTP requests concurrently
    stats_futures = fetch_stats.map(github_repos)

    # Task 2: Once each concurrent task completes, get the star counts
    stars = get_stars.map(stats_futures).result()

    # Show the results
    for repo, star_count in zip(github_repos, stars):
        print(f"{repo}: {star_count} stars")

Calling .result() on the list of futures returned by .map() will block until all tasks are complete.

Read more in the .map() documentation.

Avoid getting rate limited

One consequence of running tasks concurrently is that you’re more likely to hit the rate limits of whatever API you’re using. To avoid this, use Prefect to set a global concurrency limit.

# GitHub has a rate limit of 60 unauthenticated requests per hour (~0.016 requests per second)
prefect gcl create github-api --limit 60 --slot-decay-per-second 0.016

Now, you can use this global concurrency limit in your code to rate limit your API requests.

from typing import Any
from prefect import task
from prefect.concurrency.sync import rate_limit

@task
def fetch_stats(github_repo: str) -> dict[str, Any]:
    """Task 1: Fetch the statistics for a GitHub repo"""
    rate_limit("github-api")
    return httpx.get(f"https://api.github.com/repos/{github_repo}").json()

Cache the results of a task

For efficiency, you can skip tasks that have already run. For example, if you don’t want to fetch the number of stars for a given repository more than once per day, you can cache those results for a day.

from typing import Any
from datetime import timedelta

from prefect import task
from prefect.cache_policies import INPUTS

@task(cache_policy=INPUTS, cache_expiration=timedelta(days=1))
def fetch_stats(github_repo: str) -> dict[str, Any]:
    """Task 1: Fetch the statistics for a GitHub repo"""
    # ...

Run your improved flow

This is what your flow looks like after applying all of these improvements:

my_data_pipeline.py
from typing import Any
from datetime import timedelta

import httpx
from prefect import flow, task
from prefect.cache_policies import INPUTS
from prefect.concurrency.sync import rate_limit

@task(
    retries=3,
    cache_policy=INPUTS,
    cache_expiration=timedelta(days=1)
)
def fetch_stats(github_repo: str) -> dict[str, Any]:
    """Task 1: Fetch the statistics for a GitHub repo"""
    rate_limit("github-api")
    return httpx.get(f"https://api.github.com/repos/{github_repo}").json()


@task
def get_stars(repo_stats: dict[str, Any]) -> int:
    """Task 2: Get the number of stars from GitHub repo statistics"""
    return repo_stats["stargazers_count"]


@flow(log_prints=True)
def show_stars(github_repos: list[str]) -> None:
    """Flow: Show number of GitHub repo stars"""

    # Task 1: Make HTTP requests concurrently
    stats_futures = fetch_stats.map(github_repos)

    # Task 2: Once each concurrent task completes, get the star counts
    stars = get_stars.map(stats_futures).result()

    # Show the results
    for repo, star_count in zip(github_repos, stars):
        print(f"{repo}: {star_count} stars")


# Run the flow
if __name__ == "__main__":
    show_stars([
        "PrefectHQ/prefect",
        "pydantic/pydantic",
        "huggingface/transformers"
    ])

Run your flow twice: once to run the tasks and cache the result, again to retrieve the results from the cache.

# Run the tasks and cache the results
python my_data_pipeline.py

# Run again (notice the cached results)
python my_data_pipeline.py

The terminal output from the second flow run should look like this:

20:03:04.398 | INFO | prefect.engine - Created flow run 'laughing-nightingale' for flow 'show-stars'
20:03:05.146 | INFO | Task run 'fetch_stats-90f' - Finished in state Cached(type=COMPLETED)
20:03:05.149 | INFO | Task run 'fetch_stats-258' - Finished in state Cached(type=COMPLETED)
20:03:05.153 | INFO | Task run 'fetch_stats-924' - Finished in state Cached(type=COMPLETED)
20:03:05.159 | INFO | Task run 'get_stars-3a9' - Finished in state Completed()
20:03:05.159 | INFO | Task run 'get_stars-ed3' - Finished in state Completed()
20:03:05.161 | INFO | Task run 'get_stars-39c' - Finished in state Completed()
20:03:05.162 | INFO | Flow run 'laughing-nightingale' - PrefectHQ/prefect: 17756 stars
20:03:05.163 | INFO | Flow run 'laughing-nightingale' - pydantic/pydantic: 21613 stars
20:03:05.163 | INFO | Flow run 'laughing-nightingale' - huggingface/transformers: 136166 stars
20:03:05.339 | INFO | Flow run 'laughing-nightingale' - Finished in state Completed()

Next steps

In this tutorial, you built a resilient and performant data pipeline which uses the following techniques:

Next, learn how to handle data dependencies and ingest large amounts of data. You’ll use error handling, pagination, and nested flows to scrape data from GitHub.

Need help? Book a meeting with a Prefect Product Advocate to get your questions answered.