In the Quickstart, you created a Prefect flow to get stars for a list of GitHub repositories. And in Schedule a flow, you learned how to schedule runs of that flow on remote infrastructure.

In this tutorial, you’ll learn how to turn this flow into a resilient and performant data pipeline. The real world is messy, and Prefect is designed to handle that messiness.

  • Your API requests can fail.
  • Your API requests run too slowly.
  • Your API requests run too quickly and you get rate limited.
  • You waste time and money running the same tasks multiple times.

Instead of solving these problems in the business logic itself, use Prefect’s built-in features to handle them.

Retry on failure

The first improvement you can make is to add retries to your flow. Whenever an HTTP request fails, you can retry it a few times before giving up.

from prefect import task

@task(retries=3)
def fetch_stats(github_repo: str):
    """Task 1: Fetch the statistics for a GitHub repo"""

    api_response = httpx.get(f"https://api.github.com/repos/{github_repo}")
    api_response.raise_for_status() # Force a retry if you don't get a 2xx status code
    return api_response.json()

Concurrent execution of slow tasks

If individual API requests are slow, you can speed them up in aggregate by making multiple requests concurrently. When you call the submit method on a task, the task is submitted to a task runner for execution.

from prefect import flow

@flow(log_prints=True)
def show_stars(github_repos: list[str]):
    """Flow: Show the number of stars that GitHub repos have"""

    # Task 1: Make HTTP requests concurrently
    repo_stats = []
    for repo in github_repos:
        repo_stats.append({
            'repo': repo,
            'task': fetch_stats.submit(repo) # Submit each task to a task runner
        })

    # Task 2: Once each concurrent task completes, show the results
    for repo in repo_stats:
        repo_name = repo['repo']
        stars = get_stars(repo['task'].result()) # Block until the task has completed
        print(f"{repo_name}: {stars} stars")

Avoid getting rate limited

One consequence of running tasks concurrently is that you’re more likely to hit the rate limits of whatever API you’re using. To avoid this, use Prefect to set a global concurrency limit.

# GitHub has a rate limit of 60 unauthenticated requests per hour (~0.016 requests per second)
prefect gcl create github-api --limit 60 --slot-decay-per-second 0.016

Now, you can use this global concurrency limit in your code:

from prefect import flow
from prefect.concurrency.sync import rate_limit

@flow(log_prints=True)
def show_stars(github_repos: list[str]):
    """Flow: Show the number of stars that GitHub repos have"""

    repo_stats = []
    for repo in github_repos:
        # Apply the concurrency limit to this loop
        rate_limit("github-api")

        # Call Task 1
        repo_stats.append({
            'repo': repo,
            'task': fetch_stats.submit(repo)
        })

        # ...

Cache the results of a task

For efficiency, you can skip tasks that have already run. For example, if you don’t want to fetch the number of stars for a given repository more than once per day, you can cache those results for a day.

from datetime import timedelta

from prefect import task
from prefect.cache_policies import INPUTS

@task(cache_policy=INPUTS, cache_expiration=timedelta(days=1))
def fetch_stats(github_repo: str):
    """Task 1: Fetch the statistics for a GitHub repo"""
    # ...

Run your improved flow

This is what your flow looks like after applying all of these improvements:

my_data_pipeline.py
from datetime import timedelta
import httpx

from prefect import flow, task
from prefect.cache_policies import INPUTS
from prefect.concurrency.sync import rate_limit


@flow(log_prints=True)
def show_stars(github_repos: list[str]):
    """Flow: Show the number of stars that GitHub repos have"""

    # Task 1: Make HTTP requests concurrently while respecting concurrency limits
    repo_stats = []
    for repo in github_repos:
        rate_limit("github-api")
        repo_stats.append({
            'repo': repo,
            'task': fetch_stats.submit(repo) # Submit each task to a task runner
        })

    # Task 2: Once each concurrent task completes, show the results
    for repo in repo_stats:
        repo_name = repo['repo']
        stars = get_stars(repo['task'].result()) # Block until the task has completed
        print(f"{repo_name}: {stars} stars")


@task(retries=3, cache_policy=INPUTS, cache_expiration=timedelta(days=1))
def fetch_stats(github_repo: str):
    """Task 1: Fetch the statistics for a GitHub repo"""

    api_response = httpx.get(f"https://api.github.com/repos/{github_repo}")
    api_response.raise_for_status() # Force a retry if you don't get a 2xx status code
    return api_response.json()


@task
def get_stars(repo_stats: dict):
    """Task 2: Get the number of stars from GitHub repo statistics"""

    return repo_stats['stargazers_count']


# Run the flow
if __name__ == "__main__":
    show_stars([
        "PrefectHQ/prefect",
        "pydantic/pydantic",
        "huggingface/transformers"
    ])

Run your flow twice: once to run the tasks and cache the result, again to retrieve the results from the cache.

# Run the tasks and cache the results
python my_data_pipeline.py

# Retrieve the cached results
python my_data_pipeline.py

The terminal output from the second flow run should look like this:

09:08:12.265 | INFO    | prefect.engine - Created flow run 'laughing-nightingale' for flow 'show-stars'
09:08:12.266 | INFO    | prefect.engine - View at http://127.0.0.1:4200/runs/flow-run/541864e8-12f7-4890-9397-b2ed361f6b20
09:08:12.322 | INFO    | Task run 'fetch_stats-0c9' - Finished in state Cached(type=COMPLETED)
09:08:12.359 | INFO    | Task run 'fetch_stats-e89' - Finished in state Cached(type=COMPLETED)
09:08:12.360 | INFO    | Task run 'get_stars-b51' - Finished in state Completed()
09:08:12.361 | INFO    | Flow run 'laughing-nightingale' - PrefectHQ/prefect: 17320 stars
09:08:12.372 | INFO    | Task run 'fetch_stats-8ef' - Finished in state Cached(type=COMPLETED)
09:08:12.374 | INFO    | Task run 'get_stars-08d' - Finished in state Completed()
09:08:12.374 | INFO    | Flow run 'laughing-nightingale' - pydantic/pydantic: 186319 stars
09:08:12.387 | INFO    | Task run 'get_stars-2af' - Finished in state Completed()
09:08:12.387 | INFO    | Flow run 'laughing-nightingale' - huggingface/transformers: 134849 stars
09:08:12.404 | INFO    | Flow run 'laughing-nightingale' - Finished in state Completed()

Next steps

In this tutorial, you built a resilient and performant data pipeline which uses the following techniques:

Next, learn how to handle data dependencies and ingest large amounts of data. You’ll use error handling, pagination, and nested flows to scrape data from GitHub.

Need help? Book a meeting with a Prefect Product Advocate to get your questions answered.