A Prefect task is a discrete unit of work in a Prefect workflow. You can turn any Python function into a task by adding an @task decorator to it. Tasks can:

  • Take inputs, perform work, and return outputs
  • Cache their execution across invocations
  • Encapsulate workflow logic into reusable units across flows
  • Receive metadata about upstream task dependencies and their state before running
  • Use automatic logging to capture runtime details, tags, and final state
  • Execute concurrently
  • Be defined in the same file as the flow or imported from modules
  • Be called from flows or other tasks

Flows and tasks share some common features:

  • They can be defined using their respective decorator, which accepts configuration settings (see all task settings and flow settings)
  • They can have a name, description, and tags for organization and bookkeeping
  • They provide retries, timeouts, and other hooks to handle failure and completion events

Example task

Here’s an example of a simple flow with a single task:

repo_info.py
import httpx
from prefect import flow, task
from typing import Optional


@task
def get_url(url: str, params: Optional[dict[str, any]] = None):
    response = httpx.get(url, params=params)
    response.raise_for_status()
    return response.json()


@flow(retries=3, retry_delay_seconds=5, log_prints=True)
def get_repo_info(repo_name: str = "PrefectHQ/prefect"):
    url = f"https://api.github.com/repos/{repo_name}"
    repo_stats = get_url(url)
    print(f"{repo_name} repository statistics 🤓:")
    print(f"Stars 🌠 : {repo_stats['stargazers_count']}")
    print(f"Forks 🍴 : {repo_stats['forks_count']}")


if __name__ == "__main__":
    get_repo_info()

Running that flow in the terminal results in output like this:

09:55:55.412 | INFO    | prefect.engine - Created flow run 'great-ammonite' for flow 'get-repo-info'
09:55:55.499 | INFO    | Flow run 'great-ammonite' - Created task run 'get_url-0' for task 'get_url'
09:55:55.500 | INFO    | Flow run 'great-ammonite' - Executing 'get_url-0' immediately...
09:55:55.825 | INFO    | Task run 'get_url-0' - Finished in state Completed()
09:55:55.827 | INFO    | Flow run 'great-ammonite' - PrefectHQ/prefect repository statistics 🤓:
09:55:55.827 | INFO    | Flow run 'great-ammonite' - Stars 🌠 : 12157
09:55:55.827 | INFO    | Flow run 'great-ammonite' - Forks 🍴 : 1251
09:55:55.849 | INFO    | Flow run 'great-ammonite' - Finished in state Completed('All states completed.')

This task run is tracked in the UI as well.

Tasks are uniquely identified by a task key, which is a hash composed of the task name, the fully qualified name of the function, and any tags. If the task does not have a name specified, the name is derived from the decorated function object.

How big should a task be?

Prefect encourages “small tasks.” Each one should represent a single logical step of your workflow. This allows Prefect to better contain task failures.

There’s nothing stopping you from putting all of your code in a single task. However, if any line of code fails, the entire task fails and must be retried from the beginning. Avoid this by splitting the code into multiple dependent tasks.

Supported functions

Almost any standard Python function can be turned into a Prefect task by adding the @task decorator.

Prefect uses client-side task run orchestration by default, which significantly improves performance, especially for workflows with many tasks. Task creation and state updates happen locally, reducing API calls to the Prefect server during execution. This enables efficient handling of large-scale workflows and improves reliability when server connectivity is intermittent.

Tasks are always executed in the main thread by default, unless a specific task runner is used to execute them on different threads, processes, or infrastructure. This facilitates native Python debugging and profiling.

Task updates are logged in batch, leading to eventual consistency for task states in the UI and API queries. While this means there may be a slight delay in seeing the most up-to-date task states, it allows for substantial performance improvements and increased workflow scale.

Synchronous functions

The simplest Prefect task is a synchronous Python function. Here’s an example of a synchronous task that prints a message:

from prefect import task


@task
def print_message():
    print("Hello, I'm a task")


if __name__ == "__main__":
    print_message()

Asynchronous functions

Prefect also supports asynchronous Python functions. The resulting tasks are coroutines that can be awaited or run concurrently, following standard async Python behavior.

from prefect import task
import asyncio


@task
async def print_message():
    await asyncio.sleep(1)
    print("Hello, I'm an async task")


asyncio.run(print_message())

Class Methods

Prefect supports synchronous and asynchronous methods as tasks, including instance methods, class methods, and static methods. For class methods and static methods, you must apply the appropriate method decorator above the @task decorator:

from prefect import task


class MyClass:

    @task
    def my_instance_method(self):
        pass


    @classmethod
    @task
    def my_class_method(cls):
        pass


    @staticmethod
    @task
    def my_static_method():
        pass


MyClass().my_instance_method()
MyClass.my_class_method()
MyClass.my_static_method()

Generators

Prefect supports synchronous and asynchronous generators as tasks. The task is considered to be Running as long as the generator is yielding values. When the generator is exhausted, the task is considered Completed. Any values yielded by the generator can be consumed by other tasks, and they will automatically record the generator task as their parent.

from prefect import task


@task
def generator():
    for i in range(10):
        yield i


@task
def consumer(x):
    print(x)


for val in generator():
    consumer(val)

Generator functions are consumed when returned from tasks

The result of a completed task must be serializable, but generators cannot be serialized. Therefore, if you return a generator from a task, the generator will be fully consumed and its yielded values will be returned as a list. This can lead to unexpected behavior or blocking if the generator is infinite or very large.

Here is an example of proactive generator consumption:

from prefect import task


def gen():
    yield from [1, 2, 3]
    print('Generator consumed!')


@task
def f():
    return gen()


f()  # prints 'Generator consumed!'

If you need to return a generator without consuming it, you can yield it instead of using return. Values yielded from generator tasks are not considered final results and do not face the same serialization constraints:

from prefect import task


def gen():
    yield from [1, 2, 3]
    print('Generator consumed!')


@task
def f():
    yield gen()


generator = next(f())
list(generator) # prints 'Generator consumed!'

Concurrency

Tasks enable concurrent execution, allowing you to execute multiple tasks asynchronously. This concurrency can greatly enhance the efficiency and performance of your workflows.

Expand the script to calculate the average open issues per user by making more requests:

repo_info.py
import httpx
from datetime import timedelta
from prefect import flow, task
from prefect.tasks import task_input_hash
from typing import Optional


@task(cache_key_fn=task_input_hash, cache_expiration=timedelta(hours=1))
def get_url(url: str, params: Optional[dict[str, any]] = None):
    response = httpx.get(url, params=params)
    response.raise_for_status()
    return response.json()


def get_open_issues(repo_name: str, open_issues_count: int, per_page: int = 100):
    issues = []
    pages = range(1, -(open_issues_count // -per_page) + 1)
    for page in pages:
        issues.append(
            get_url(
                f"https://api.github.com/repos/{repo_name}/issues",
                params={"page": page, "per_page": per_page, "state": "open"},
            )
        )
    return [i for p in issues for i in p]


@flow(retries=3, retry_delay_seconds=5, log_prints=True)
def get_repo_info(repo_name: str = "PrefectHQ/prefect"):
    repo_stats = get_url(f"https://api.github.com/repos/{repo_name}")
    issues = get_open_issues(repo_name, repo_stats["open_issues_count"])
    issues_per_user = len(issues) / len(set([i["user"]["id"] for i in issues]))
    print(f"{repo_name} repository statistics 🤓:")
    print(f"Stars 🌠 : {repo_stats['stargazers_count']}")
    print(f"Forks 🍴 : {repo_stats['forks_count']}")
    print(f"Average open issues per user 💌 : {issues_per_user:.2f}")


if __name__ == "__main__":
    get_repo_info()

Now you’re fetching the data you need, but the requests happen sequentially. Tasks expose a submit method that changes the execution from sequential to concurrent. In this example, you also need to use the result method to unpack a list of return values:

def get_open_issues(repo_name: str, open_issues_count: int, per_page: int = 100):
    issues = []
    pages = range(1, -(open_issues_count // -per_page) + 1)
    for page in pages:
        issues.append(
            get_url.submit(
                f"https://api.github.com/repos/{repo_name}/issues",
                params={"page": page, "per_page": per_page, "state": "open"},
            )
        )
    return [i for p in issues for i in p.result()]

The logs show that each task is running concurrently:

12:45:28.241 | INFO    | prefect.engine - Created flow run 'intrepid-coua' for flow 'get-repo-info'
12:45:28.311 | INFO    | Flow run 'intrepid-coua' - Created task run 'get_url-0' for task 'get_url'
12:45:28.312 | INFO    | Flow run 'intrepid-coua' - Executing 'get_url-0' immediately...
12:45:28.543 | INFO    | Task run 'get_url-0' - Finished in state Completed()
12:45:28.583 | INFO    | Flow run 'intrepid-coua' - Created task run 'get_url-1' for task 'get_url'
12:45:28.584 | INFO    | Flow run 'intrepid-coua' - Submitted task run 'get_url-1' for execution.
12:45:28.594 | INFO    | Flow run 'intrepid-coua' - Created task run 'get_url-2' for task 'get_url'
12:45:28.594 | INFO    | Flow run 'intrepid-coua' - Submitted task run 'get_url-2' for execution.
12:45:28.609 | INFO    | Flow run 'intrepid-coua' - Created task run 'get_url-4' for task 'get_url'
12:45:28.610 | INFO    | Flow run 'intrepid-coua' - Submitted task run 'get_url-4' for execution.
12:45:28.624 | INFO    | Flow run 'intrepid-coua' - Created task run 'get_url-5' for task 'get_url'
12:45:28.625 | INFO    | Flow run 'intrepid-coua' - Submitted task run 'get_url-5' for execution.
12:45:28.640 | INFO    | Flow run 'intrepid-coua' - Created task run 'get_url-6' for task 'get_url'
12:45:28.641 | INFO    | Flow run 'intrepid-coua' - Submitted task run 'get_url-6' for execution.
12:45:28.708 | INFO    | Flow run 'intrepid-coua' - Created task run 'get_url-3' for task 'get_url'
12:45:28.708 | INFO    | Flow run 'intrepid-coua' - Submitted task run 'get_url-3' for execution.
12:45:29.096 | INFO    | Task run 'get_url-6' - Finished in state Completed()
12:45:29.565 | INFO    | Task run 'get_url-2' - Finished in state Completed()
12:45:29.721 | INFO    | Task run 'get_url-5' - Finished in state Completed()
12:45:29.749 | INFO    | Task run 'get_url-4' - Finished in state Completed()
12:45:29.801 | INFO    | Task run 'get_url-3' - Finished in state Completed()
12:45:29.817 | INFO    | Task run 'get_url-1' - Finished in state Completed()
12:45:29.820 | INFO    | Flow run 'intrepid-coua' - PrefectHQ/prefect repository statistics 🤓:
12:45:29.820 | INFO    | Flow run 'intrepid-coua' - Stars 🌠 : 12159
12:45:29.821 | INFO    | Flow run 'intrepid-coua' - Forks 🍴 : 1251
Average open issues per user 💌 : 2.27
12:45:29.838 | INFO    | Flow run 'intrepid-coua' - Finished in state Completed('All states completed.')

Task configuration

Tasks allow for customization through optional arguments that can be provided to the task decorator.

ArgumentDescription
nameAn optional name for the task. If not provided, the name is inferred from the function name.
descriptionAn optional string description for the task. If not provided, the description is pulled from the docstring for the decorated function.
tagsAn optional set of tags associated with runs of this task. These tags are combined with any tags defined by a prefect.tags context at task runtime.
timeout_secondsAn optional number of seconds indicating a maximum runtime for the task. If the task exceeds this runtime, it will be marked as failed.
cache_key_fnAn optional callable that, given the task run context and call parameters, generates a string key. If the key matches a previous completed state, that state result is restored instead of running the task again.
cache_policyAn optional policy that determines what information is used to generate cache keys. Available policies include INPUTS, TASK_SOURCE, RUN_ID, FLOW_PARAMETERS, and NONE. Can be combined using the + operator.
cache_expirationAn optional amount of time indicating how long cached states for this task are restorable; if not provided, cached states will never expire.
retriesAn optional number of times to retry on task run failure.
retry_delay_secondsAn optional number of seconds to wait before retrying the task after failure. This is only applicable if retries is nonzero.
log_printsAn optional boolean indicating whether to log print statements.

See all possible options in the Python SDK docs.

For example, provide optional name and description arguments to a task:

@task(name="hello-task", description="This task says hello.")
def my_task():
    print("Hello, I'm a task")

Distinguish runs of this task by providing a task_run_name.
Python’s standard string formatting syntax applies:

import datetime
from prefect import flow, task


@task(name="My Example Task", 
      description="An example task for a tutorial.",
      task_run_name="hello-{name}-on-{date:%A}")
def my_task(name, date):
    pass


@flow
def my_flow():
    # creates a run with a name like "hello-marvin-on-Thursday"
    my_task(name="marvin", date=datetime.datetime.now(datetime.timezone.utc))

if __name__ == "__main__":
    my_flow()

Additionally, this setting accepts a function that returns a string for the task run name:

import datetime
from prefect import flow, task


def generate_task_name():
    date = datetime.datetime.now(datetime.timezone.utc)
    return f"{date:%A}-is-a-lovely-day"


@task(name="My Example Task",
      description="An example task for the docs.",
      task_run_name=generate_task_name)
def my_task(name):
    pass


@flow
def my_flow():
    # creates a run with a name like "Thursday-is-a-lovely-day"
    my_task(name="marvin")


if __name__ == "__main__":
    my_flow()  

If you need access to information about the task, use the prefect.runtime module. For example:

from prefect import flow
from prefect.runtime import flow_run, task_run


def generate_task_name():
    flow_name = flow_run.flow_name
    task_name = task_run.task_name

    parameters = task_run.parameters
    name = parameters["name"]
    limit = parameters["limit"]

    return f"{flow_name}-{task_name}-with-{name}-and-{limit}"


@task(name="my-example-task",
      description="An example task for a tutorial.",
      task_run_name=generate_task_name)
def my_task(name: str, limit: int = 100):
    pass


@flow
def my_flow(name: str):
    # creates a run with a name like "my-flow-my-example-task-with-marvin-and-100"
    my_task(name="marvin")

Tags

Tags are optional string labels that enable you to identify and group tasks other than by name or flow. Tags are useful to:

You may specify tags as a keyword argument on the task decorator.

@task(name="hello-task", tags=["test"])
def my_task():
    print("Hello, I'm a task")

Alternatively, specify tags when the task is called rather than in its definition with a tags context manager, .

from prefect import flow, task
from prefect import tags


@task
def my_task():
    print("Hello, I'm a task")


@flow
def my_flow():
    with tags("test"):
        my_task()


if __name__ == "__main__":
    my_flow()

Timeouts

Task timeouts prevent unintentional long-running tasks. When the duration of execution for a task exceeds the duration specified in the timeout, a timeout exception is raised and the task is
marked as failed. In the UI, the task is visibly designated as TimedOut. From the perspective of the flow, the timed-out task is treated like any other failed task.

Specify timeout durations with the timeout_seconds keyword argument:

from prefect import task
import time


@task(timeout_seconds=1, log_prints=True)
def show_timeouts():
    print("I will execute")
    time.sleep(5)
    print("I will not execute")

Retries

Prefect can automatically retry task runs on failure. A task run fails if its Python function raises an exception.

To enable retries, pass retries and retry_delay_seconds arguments to your task. If the task run fails, Prefect will retry it up to retries times, waiting retry_delay_seconds seconds between each attempt. If the task fails on the final retry, Prefect marks the task as failed.

A new task run is not created when a task is retried. Instead, a new state is added to the state history of the original task run.

Retries are often useful in cases that depend upon external systems, such as making an API request. The example below uses the httpx library to make an HTTP request.

import httpx
from prefect import flow, task


@task(retries=2, retry_delay_seconds=5)
def get_data_task(
    url: str = "https://api.brittle-service.com/endpoint"
) -> dict:
    response = httpx.get(url)
    
    # If the response status code is anything but a 2xx, httpx will raise
    # an exception. This task doesn't handle the exception, so Prefect will
    # catch the exception and will consider the task run failed.
    response.raise_for_status()
    
    return response.json()
    

@flow
def get_data_flow():
    get_data_task()


if __name__ == "__main__":
    get_data_flow()

In this task, if the HTTP request to the brittle API receives any status code other than a 2xx (200, 201, etc.), Prefect will retry the task a maximum of two times, waiting five seconds in between retries.

Custom retry behavior

The retry_delay_seconds option accepts a list of integers for customized retry behavior. The following task will wait for successively increasing intervals of 1, 10, and 100 seconds, respectively, before the next attempt starts:

from prefect import task


@task(retries=3, retry_delay_seconds=[1, 10, 100])
def some_task_with_manual_backoff_retries():
   (rest of code follows)

The retry_condition_fn argument accepts a callable that returns a boolean. If the callable returns True, the task will be retried. If the callable returns False, the task will not be retried. The callable accepts three arguments: the task, the task run, and the state of the task run. The following task will retry on HTTP status codes other than 401 or 404:

import httpx
from prefect import flow, task


def retry_handler(task, task_run, state) -> bool:
    """Custom retry handler that specifies when to retry a task"""
    try:
        # Attempt to get the result of the task
        state.result()
    except httpx.HTTPStatusError as exc:
        # Retry on any HTTP status code that is not 401 or 404
        do_not_retry_on_these_codes = [401, 404]
        return exc.response.status_code not in do_not_retry_on_these_codes
    except httpx.ConnectError:
        # Do not retry
        return False
    except:
        # For any other exception, retry
        return True


@task(retries=1, retry_condition_fn=retry_handler)
def my_api_call_task(url):
    response = httpx.get(url)
    response.raise_for_status()
    return response.json()


@flow
def get_data_flow(url):
    my_api_call_task(url=url)


if __name__ == "__main__":
    get_data_flow(url="https://httpbin.org/status/503")

Additionally, you can pass a callable that accepts the number of retries as an argument and returns a list. Prefect includes an exponential_backoff utility that will automatically generate a list of retry delays that correspond to an exponential backoff retry strategy. The following flow will wait for 10, 20, then 40 seconds before each retry.

from prefect import task
from prefect.tasks import exponential_backoff


@task(retries=3, retry_delay_seconds=exponential_backoff(backoff_factor=10))
def some_task_with_exponential_backoff_retries():
   (rest of code follows)

Add “jitter” to avoid thundering herds

You can add jitter to retry delay times. Jitter is a random amount of time added to retry periods that helps prevent “thundering herd” scenarios, which is when many tasks retry at the same time, potentially overwhelming systems.

The retry_jitter_factor option can be used to add variance to the base delay. For example, a retry delay of 10 seconds with a retry_jitter_factor of 0.5 will allow a delay up to 15 seconds. Large values of retry_jitter_factor provide more protection against “thundering herds,” while keeping the average retry delay time constant. For example, the following task adds jitter to its exponential backoff so the retry delays will vary up to a maximum delay time of 20, 40, and 80 seconds respectively.

from prefect import task
from prefect.tasks import exponential_backoff


@task(
    retries=3,
    retry_delay_seconds=exponential_backoff(backoff_factor=10),
    retry_jitter_factor=1,
)


def some_task_with_exponential_backoff_retries():
   (rest of code follows)

Configure retry behavior globally

Set default retries and retry delays globally through settings. These settings will not override the retries or retry_delay_seconds that are set in the task decorator.

prefect config set PREFECT_TASK_DEFAULT_RETRIES=2
prefect config set PREFECT_TASK_DEFAULT_RETRY_DELAY_SECONDS = [1, 10, 100]