Start a background task worker
To start a worker to run background tasks for a single function, define a task and call .serve()
on it.
from prefect import task
@task(log_prints=True)
def add(a: int, b: int):
print(f"{a} + {b} = {a + b}")
add.serve()
Trigger a background task
To trigger a background task, call .delay()
on the same task in a different terminal.
.delay()
has the same signature as the @task
decorated function.
Start a worker to run background tasks for multiple functions
To start a worker to run background tasks for multiple functions, call serve()
with the tasks you want to run.
from prefect import task
from prefect.task_worker import serve
@task(log_prints=True)
def add(a: int, b: int):
print(f"{a} + {b} = {a + b}")
@task(log_prints=True)
def multiply(a: int, b: int):
print(f"{a} * {b} = {a * b}")
# Both these invocations will run once the worker is started
add.delay(1, 2)
multiply.delay(1, 4)
serve(add, multiply) # start a task worker listening for both `add` and `multiply`
Submit multiple background tasks
To submit multiple background tasks, call .map()
on the task with deferred=True
.
from prefect import task
@task(log_prints=True)
def add(a: int, b: int):
print(f"{a} + {b} = {a + b}")
A = [1, 2, 3]
B = [4, 5, 6]
add.map(A, B, deferred=True)
add.serve()
.map()
accepts Iterable[P.args]
, Iterable[P.kwargs]
or unmapped
inputs.
If you want to keep one argument mapped and the other unmapped, you can do so by passing unmapped
to the mapped argument.
from prefect import task, unmapped
@task(log_prints=True)
def add(a: int, b: int):
print(f"{a} + {b} = {a + b}")
A = [1, 2, 3]
add.map(A, unmapped(4), deferred=True)
add.serve()
Further reading
Responses are generated using AI and may contain mistakes.