Write and run tasks
Learn the basics of writing tasks.
A Prefect task is a Python function decorated with @task
that represents a discrete unit of work in a
Prefect workflow. Tasks can:
- Take inputs, perform work, and return outputs
- Cache their execution across invocations
- Encapsulate workflow logic into reusable units across flows
- Receive metadata about upstream task dependencies and their state before running
- Use automatic logging to capture runtime details, tags, and final state
- Execute concurrently
- Be defined in the same file as the flow or imported from modules
- Be called from flows or other tasks (Prefect 2.18+)
Flows and tasks share some common features:
- They can be defined using their respective decorator, which accepts configuration settings (see all task settings and flow settings)
- They can have a name, description, and tags for organization and bookkeeping
- They provide retries, timeouts, and other hooks to handle failure and completion events
Example task
Here’s an example of what it looks like to move a request from a flow into a task:
import httpx
from prefect import flow, task
from typing import Optional
@task
def get_url(url: str, params: Optional[dict[str, any]] = None):
response = httpx.get(url, params=params)
response.raise_for_status()
return response.json()
@flow(retries=3, retry_delay_seconds=5, log_prints=True)
def get_repo_info(repo_name: str = "PrefectHQ/prefect"):
url = f"https://api.github.com/repos/{repo_name}"
repo_stats = get_url(url)
print(f"{repo_name} repository statistics 🤓:")
print(f"Stars 🌠 : {repo_stats['stargazers_count']}")
print(f"Forks 🍴 : {repo_stats['forks_count']}")
if __name__ == "__main__":
get_repo_info()
Running that flow in the terminal results in something like this:
09:55:55.412 | INFO | prefect.engine - Created flow run 'great-ammonite' for flow 'get-repo-info'
09:55:55.499 | INFO | Flow run 'great-ammonite' - Created task run 'get_url-0' for task 'get_url'
09:55:55.500 | INFO | Flow run 'great-ammonite' - Executing 'get_url-0' immediately...
09:55:55.825 | INFO | Task run 'get_url-0' - Finished in state Completed()
09:55:55.827 | INFO | Flow run 'great-ammonite' - PrefectHQ/prefect repository statistics 🤓:
09:55:55.827 | INFO | Flow run 'great-ammonite' - Stars 🌠 : 12157
09:55:55.827 | INFO | Flow run 'great-ammonite' - Forks 🍴 : 1251
09:55:55.849 | INFO | Flow run 'great-ammonite' - Finished in state Completed('All states completed.')
This task run is tracked in the UI as well.
Supported functions
Almost any standard Python function can be turned into a Prefect task by adding the @task
decorator.
Tasks are always executed in the main thread by default, unless a specific task runner is used to execute them on different threads, processes, or infrastructure. This facilitates native Python debugging and profiling.
Synchronous functions
The simplest Prefect task is a synchronous Python function. Here’s an example of a synchronous task that prints a message:
from prefect import task
@task
def print_message():
print("Hello, I'm a task")
print_message()
Asynchronous functions
Prefect also supports asynchronous Python functions. The resulting tasks are coroutines that can be awaited or run concurrently, following standard async Python behavior.
from prefect import task
import asyncio
@task
async def print_message():
await asyncio.sleep(1)
print("Hello, I'm an async task")
asyncio.run(print_message())
Class Methods
Prefect supports snchronous and asynchronous methods as tasks, including instance methods, class methods, and static methods. For class methods and static methods, you must apply the appropriate method decorator above the @task
decorator:
from prefect import task
class MyClass:
@task
def my_instance_method(self):
pass
@classmethod
@task
def my_class_method(cls):
pass
@staticmethod
@task
def my_static_method():
pass
MyClass().my_instance_method()
MyClass.my_class_method()
MyClass.my_static_method()
Generators
Prefect supports synchronous and asynchronous generators as tasks. The task is considered to be Running
as long as the generator is yielding values. When the generator is exhausted, the task is considered Completed
. Any values yielded by the generator can be consumed by other tasks, and they will automatically record the generator task as their parent.
from prefect import task
@task
def generator():
for i in range(10):
yield i
@task
def consumer(x):
print(x)
for val in generator():
consumer(val)
Generator functions are consumed when returned from tasks
The result of a completed task must be serializable, but generators cannot be serialized. Therefore, if you return a generator from a task, the generator will be fully consumed and its yielded values will be returned as a list. This can lead to unexpected behavior or blocking if the generator is infinite or very large.
Here is an example of proactive generator consumption:
from prefect import task
def gen():
yield from [1, 2, 3]
print('Generator consumed!')
@task
def f():
return gen()
f() # prints 'Generator consumed!'
If you need to return a generator without consuming it, you can yield
it instead of using return
.
Values yielded from generator tasks are not considered final results and do not face the same serialization constraints:
from prefect import task
def gen():
yield from [1, 2, 3]
print('Generator consumed!')
@task
def f():
yield gen()
generator = next(f())
list(generator) # prints 'Generator consumed!'
Concurrency
Tasks enable concurrent execution, allowing you to execute multiple tasks asynchronously. This concurrency can greatly enhance the efficiency and performance of your workflows.
Expand the script to calculate the average open issues per user by making more requests:
import httpx
from datetime import timedelta
from prefect import flow, task
from prefect.tasks import task_input_hash
from typing import Optional
@task(cache_key_fn=task_input_hash, cache_expiration=timedelta(hours=1))
def get_url(url: str, params: Optional[dict[str, any]] = None):
response = httpx.get(url, params=params)
response.raise_for_status()
return response.json()
def get_open_issues(repo_name: str, open_issues_count: int, per_page: int = 100):
issues = []
pages = range(1, -(open_issues_count // -per_page) + 1)
for page in pages:
issues.append(
get_url(
f"https://api.github.com/repos/{repo_name}/issues",
params={"page": page, "per_page": per_page, "state": "open"},
)
)
return [i for p in issues for i in p]
@flow(retries=3, retry_delay_seconds=5, log_prints=True)
def get_repo_info(repo_name: str = "PrefectHQ/prefect"):
repo_stats = get_url(f"https://api.github.com/repos/{repo_name}")
issues = get_open_issues(repo_name, repo_stats["open_issues_count"])
issues_per_user = len(issues) / len(set([i["user"]["id"] for i in issues]))
print(f"{repo_name} repository statistics 🤓:")
print(f"Stars 🌠 : {repo_stats['stargazers_count']}")
print(f"Forks 🍴 : {repo_stats['forks_count']}")
print(f"Average open issues per user 💌 : {issues_per_user:.2f}")
if __name__ == "__main__":
get_repo_info()
Now you’re fetching the data you need, but the requests happen sequentially.
Tasks expose a submit
method that changes
the execution from sequential to concurrent.
In this example, you also need to use the
result
method to unpack a list of return values:
def get_open_issues(repo_name: str, open_issues_count: int, per_page: int = 100):
issues = []
pages = range(1, -(open_issues_count // -per_page) + 1)
for page in pages:
issues.append(
get_url.submit(
f"https://api.github.com/repos/{repo_name}/issues",
params={"page": page, "per_page": per_page, "state": "open"},
)
)
return [i for p in issues for i in p.result()]
The logs show that each task is running concurrently:
12:45:28.241 | INFO | prefect.engine - Created flow run 'intrepid-coua' for flow 'get-repo-info'
12:45:28.311 | INFO | Flow run 'intrepid-coua' - Created task run 'get_url-0' for task 'get_url'
12:45:28.312 | INFO | Flow run 'intrepid-coua' - Executing 'get_url-0' immediately...
12:45:28.543 | INFO | Task run 'get_url-0' - Finished in state Completed()
12:45:28.583 | INFO | Flow run 'intrepid-coua' - Created task run 'get_url-1' for task 'get_url'
12:45:28.584 | INFO | Flow run 'intrepid-coua' - Submitted task run 'get_url-1' for execution.
12:45:28.594 | INFO | Flow run 'intrepid-coua' - Created task run 'get_url-2' for task 'get_url'
12:45:28.594 | INFO | Flow run 'intrepid-coua' - Submitted task run 'get_url-2' for execution.
12:45:28.609 | INFO | Flow run 'intrepid-coua' - Created task run 'get_url-4' for task 'get_url'
12:45:28.610 | INFO | Flow run 'intrepid-coua' - Submitted task run 'get_url-4' for execution.
12:45:28.624 | INFO | Flow run 'intrepid-coua' - Created task run 'get_url-5' for task 'get_url'
12:45:28.625 | INFO | Flow run 'intrepid-coua' - Submitted task run 'get_url-5' for execution.
12:45:28.640 | INFO | Flow run 'intrepid-coua' - Created task run 'get_url-6' for task 'get_url'
12:45:28.641 | INFO | Flow run 'intrepid-coua' - Submitted task run 'get_url-6' for execution.
12:45:28.708 | INFO | Flow run 'intrepid-coua' - Created task run 'get_url-3' for task 'get_url'
12:45:28.708 | INFO | Flow run 'intrepid-coua' - Submitted task run 'get_url-3' for execution.
12:45:29.096 | INFO | Task run 'get_url-6' - Finished in state Completed()
12:45:29.565 | INFO | Task run 'get_url-2' - Finished in state Completed()
12:45:29.721 | INFO | Task run 'get_url-5' - Finished in state Completed()
12:45:29.749 | INFO | Task run 'get_url-4' - Finished in state Completed()
12:45:29.801 | INFO | Task run 'get_url-3' - Finished in state Completed()
12:45:29.817 | INFO | Task run 'get_url-1' - Finished in state Completed()
12:45:29.820 | INFO | Flow run 'intrepid-coua' - PrefectHQ/prefect repository statistics 🤓:
12:45:29.820 | INFO | Flow run 'intrepid-coua' - Stars 🌠 : 12159
12:45:29.821 | INFO | Flow run 'intrepid-coua' - Forks 🍴 : 1251
Average open issues per user 💌 : 2.27
12:45:29.838 | INFO | Flow run 'intrepid-coua' - Finished in state Completed('All states completed.')
Call a task from a flow
Use the @task
decorator to designate a function as a task. Calling the task creates a new task run:
from prefect import flow, task
@task
def my_task():
print("Hello, I'm a task")
@flow
def my_flow():
my_task()
Call a task from another task
A task can be called from within another task:
from prefect import task
@task
def my_task():
print("Hello, I'm a task")
@task(log_prints=True)
def my_parent_task():
my_task()
Tasks are uniquely identified by a task key, which is a hash composed of the task name, the fully qualified name of the function, and any tags. If the task does not have a name specified, the name is derived from the task function.
How big should a task be?
Prefect encourages “small tasks.” Each one should represent a single logical step of your workflow. This allows Prefect to better contain task failures.
There’s nothing stopping you from putting all of your code in a single task. However, if any line of code fails, the entire task fails and must be retried from the beginning. Avoid this by splitting the code into multiple dependent tasks.
Task configuration
Tasks allow for customization through optional arguments that can be provided to the task decorator.
Argument | Description |
---|---|
name | An optional name for the task. If not provided, the name is inferred from the function name. |
description | An optional string description for the task. If not provided, the description is pulled from the docstring for the decorated function. |
tags | An optional set of tags associated with runs of this task. These tags are combined with any tags defined by a prefect.tags context at task runtime. |
cache_key_fn | An optional callable that, given the task run context and call parameters, generates a string key. If the key matches a previous completed state, that state result is restored instead of running the task again. |
cache_expiration | An optional amount of time indicating how long cached states for this task are restorable; if not provided, cached states will never expire. |
retries | An optional number of times to retry on task run failure. |
retry_delay_seconds | An optional number of seconds to wait before retrying the task after failure. This is only applicable if retries is nonzero. |
log_prints | An optional boolean indicating whether to log print statements. |
See all possible options in the Python SDK docs.
For example, you can provide a name
value for the task. Here’s an example of the optional description
argument
as well:
@task(name="hello-task",
description="This task says hello.")
def my_task():
print("Hello, I'm a task")
You can distinguish runs of this task by providing a task_run_name
; this setting accepts a string
that may contain templated references to the keyword arguments of your task. The name is
formatted using Python’s standard string formatting syntax:
import datetime
from prefect import flow, task
@task(name="My Example Task",
description="An example task for a tutorial.",
task_run_name="hello-{name}-on-{date:%A}")
def my_task(name, date):
pass
@flow
def my_flow():
# creates a run with a name like "hello-marvin-on-Thursday"
my_task(name="marvin", date=datetime.datetime.now(datetime.timezone.utc))
Additionally this setting accepts a function that returns a string for the task run name:
import datetime
from prefect import flow, task
def generate_task_name():
date = datetime.datetime.now(datetime.timezone.utc)
return f"{date:%A}-is-a-lovely-day"
@task(name="My Example Task",
description="An example task for a tutorial.",
task_run_name=generate_task_name)
def my_task(name):
pass
@flow
def my_flow():
# creates a run with a name like "Thursday-is-a-lovely-day"
my_task(name="marvin")
If you need access to information about the task, use the prefect.runtime
module. For example:
from prefect import flow
from prefect.runtime import flow_run, task_run
def generate_task_name():
flow_name = flow_run.flow_name
task_name = task_run.task_name
parameters = task_run.parameters
name = parameters["name"]
limit = parameters["limit"]
return f"{flow_name}-{task_name}-with-{name}-and-{limit}"
@task(name="my-example-task",
description="An example task for a tutorial.",
task_run_name=generate_task_name)
def my_task(name: str, limit: int = 100):
pass
@flow
def my_flow(name: str):
# creates a run with a name like "my-flow-my-example-task-with-marvin-and-100"
my_task(name="marvin")
Tags
Tags are optional string labels that enable you to identify and group tasks other than by name or flow. Tags are useful to:
- Filter task runs by tag in the UI and through the Prefect REST API.
- Set concurrency limits on task runs by tag.
You may specify tags as a keyword argument on the task decorator.
@task(name="hello-task", tags=["test"])
def my_task():
print("Hello, I'm a task")
You can also provide tags as an argument with a
tags
context manager,
specifying tags when the task is called rather than in its definition.
from prefect import flow, task
from prefect import tags
@task
def my_task():
print("Hello, I'm a task")
@flow
def my_flow():
with tags("test"):
my_task()
Timeouts
Task timeouts prevent unintentional long-running tasks. When the duration of execution for a
task exceeds the duration specified in the timeout, a timeout exception is raised and the task is
marked as failed. In the UI, the task is visibly designated as TimedOut
. From the perspective of the
flow, the timed-out task is treated like any other failed task.
Specify timeout durations with the timeout_seconds
keyword argument:
from prefect import task
import time
@task(timeout_seconds=1, log_prints=True)
def show_timeouts():
print("I will execute")
time.sleep(5)
print("I will not execute")
Task results
Depending on how you call tasks, they can return different types of results and optionally engage the use of a task runner.
Any task can return:
- Data , such as
int
,str
,dict
,list
. This is the default behavior any time you callyour_task()
. PrefectFuture
. This is achieved by callingyour_task.submit()
. APrefectFuture
contains both data and State.- Prefect
State
. Anytime you call your task or flow with the argumentreturn_state=True
, it directly returns a state to build custom behavior based on a state change you care about, such as task or flow failing or retrying.
To run your task with a task runner, you must call the task
with .submit()
.
See state returned values for examples.
Task runners are optional
If you just need the result from a task, call the task from your flow. For most workflows, the default behavior of calling a task directly and receiving a result is enough.
Was this page helpful?