prefect.task_runners

Classes

TaskRunner

Abstract base class for task runners. A task runner is responsible for submitting tasks to the task run engine running in an execution environment. Submitted tasks are non-blocking and return a future object that can be used to wait for the task to complete and retrieve the result. Task runners are context managers and should be used in a with block to ensure proper cleanup of resources. Methods:

duplicate

duplicate(self) -> Self
Return a new instance of this task runner with the same configuration.

map

map(self, task: 'Task[P, R | CoroutineType[Any, Any, R]]', parameters: dict[str, Any | unmapped[Any] | allow_failure[Any]], wait_for: Iterable[PrefectFuture[R]] | None = None) -> PrefectFutureList[F]
Submit multiple tasks to the task run engine. Args:
  • task: The task to submit.
  • parameters: The parameters to use when running the task.
  • wait_for: A list of futures that the task depends on.
Returns:
  • An iterable of future objects that can be used to wait for the tasks to
  • complete and retrieve the results.

name

name(self) -> str
The name of this task runner

submit

submit(self, task: 'Task[P, CoroutineType[Any, Any, R]]', parameters: dict[str, Any], wait_for: Iterable[PrefectFuture[Any]] | None = None, dependencies: dict[str, set[RunInput]] | None = None) -> F

submit

submit(self, task: 'Task[Any, R]', parameters: dict[str, Any], wait_for: Iterable[PrefectFuture[Any]] | None = None, dependencies: dict[str, set[RunInput]] | None = None) -> F

submit

submit(self, task: 'Task[P, R | CoroutineType[Any, Any, R]]', parameters: dict[str, Any], wait_for: Iterable[PrefectFuture[Any]] | None = None, dependencies: dict[str, set[RunInput]] | None = None) -> F

ThreadPoolTaskRunner

A task runner that executes tasks in a separate thread pool. Examples: Use a thread pool task runner with a flow:
from prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner

@task
def some_io_bound_task(x: int) -> int:
    # making a query to a database, reading a file, etc.
    return x * 2

@flow(task_runner=ThreadPoolTaskRunner(max_workers=3)) # use at most 3 threads at a time
def my_io_bound_flow():
    futures = []
    for i in range(10):
        future = some_io_bound_task.submit(i * 100)
        futures.append(future)

    return [future.result() for future in futures]
Use a thread pool task runner as a context manager:
from prefect.task_runners import ThreadPoolTaskRunner

@task
def some_io_bound_task(x: int) -> int:
    # making a query to a database, reading a file, etc.
    return x * 2

# Use the runner directly
with ThreadPoolTaskRunner(max_workers=2) as runner:
    future1 = runner.submit(some_io_bound_task, {"x": 1})
    future2 = runner.submit(some_io_bound_task, {"x": 2})

    result1 = future1.result()  # 2
    result2 = future2.result()  # 4
Configure max workers via settings:
# Set via environment variable
# export PREFECT_TASK_RUNNER_THREAD_POOL_MAX_WORKERS=8

from prefect import flow
from prefect.task_runners import ThreadPoolTaskRunner

@flow(task_runner=ThreadPoolTaskRunner())  # Uses 8 workers from setting
def my_flow():
    ...
Methods:

cancel_all

cancel_all(self) -> None

duplicate

duplicate(self) -> 'ThreadPoolTaskRunner[R]'

map

map(self, task: 'Task[P, CoroutineType[Any, Any, R]]', parameters: dict[str, Any], wait_for: Iterable[PrefectFuture[Any]] | None = None) -> PrefectFutureList[PrefectConcurrentFuture[R]]

map

map(self, task: 'Task[Any, R]', parameters: dict[str, Any], wait_for: Iterable[PrefectFuture[Any]] | None = None) -> PrefectFutureList[PrefectConcurrentFuture[R]]

map

map(self, task: 'Task[P, R | CoroutineType[Any, Any, R]]', parameters: dict[str, Any], wait_for: Iterable[PrefectFuture[Any]] | None = None) -> PrefectFutureList[PrefectConcurrentFuture[R]]

submit

submit(self, task: 'Task[P, CoroutineType[Any, Any, R]]', parameters: dict[str, Any], wait_for: Iterable[PrefectFuture[Any]] | None = None, dependencies: dict[str, set[RunInput]] | None = None) -> PrefectConcurrentFuture[R]

submit

submit(self, task: 'Task[Any, R]', parameters: dict[str, Any], wait_for: Iterable[PrefectFuture[Any]] | None = None, dependencies: dict[str, set[RunInput]] | None = None) -> PrefectConcurrentFuture[R]

submit

submit(self, task: 'Task[P, R | CoroutineType[Any, Any, R]]', parameters: dict[str, Any], wait_for: Iterable[PrefectFuture[Any]] | None = None, dependencies: dict[str, set[RunInput]] | None = None) -> PrefectConcurrentFuture[R]
Submit a task to the task run engine running in a separate thread. Args:
  • task: The task to submit.
  • parameters: The parameters to use when running the task.
  • wait_for: A list of futures that the task depends on.
Returns:
  • A future object that can be used to wait for the task to complete and
  • retrieve the result.

ProcessPoolTaskRunner

A task runner that executes tasks in a separate process pool. This task runner uses ProcessPoolExecutor to run tasks in separate processes, providing true parallelism for CPU-bound tasks and process isolation. Tasks are executed with proper context propagation and error handling. Examples: Use a process pool task runner with a flow:
from prefect import flow, task
from prefect.task_runners import ProcessPoolTaskRunner

@task
def compute_heavy_task(n: int) -> int:
    # CPU-intensive computation that benefits from process isolation
    return sum(i ** 2 for i in range(n))

@flow(task_runner=ProcessPoolTaskRunner(max_workers=4))
def my_flow():
    futures = []
    for i in range(10):
        future = compute_heavy_task.submit(i * 1000)
        futures.append(future)

    return [future.result() for future in futures]
Use a process pool task runner as a context manager:
from prefect.task_runners import ProcessPoolTaskRunner

@task
def my_task(x: int) -> int:
    return x * 2

# Use the runner directly
with ProcessPoolTaskRunner(max_workers=2) as runner:
    future1 = runner.submit(my_task, {"x": 1})
    future2 = runner.submit(my_task, {"x": 2})

    result1 = future1.result()  # 2
    result2 = future2.result()  # 4
Configure max workers via settings:
# Set via environment variable
# export PREFECT_TASKS_RUNNER_PROCESS_POOL_MAX_WORKERS=8

from prefect import flow
from prefect.task_runners import ProcessPoolTaskRunner

@flow(task_runner=ProcessPoolTaskRunner())  # Uses 8 workers from setting
def my_flow():
    ...
Methods:

cancel_all

cancel_all(self) -> None

duplicate

duplicate(self) -> Self

map

map(self, task: 'Task[P, CoroutineType[Any, Any, R]]', parameters: dict[str, Any], wait_for: Iterable[PrefectFuture[Any]] | None = None) -> PrefectFutureList[PrefectConcurrentFuture[R]]

map

map(self, task: 'Task[Any, R]', parameters: dict[str, Any], wait_for: Iterable[PrefectFuture[Any]] | None = None) -> PrefectFutureList[PrefectConcurrentFuture[R]]

map

map(self, task: 'Task[P, R | CoroutineType[Any, Any, R]]', parameters: dict[str, Any], wait_for: Iterable[PrefectFuture[Any]] | None = None) -> PrefectFutureList[PrefectConcurrentFuture[R]]

submit

submit(self, task: 'Task[P, CoroutineType[Any, Any, R]]', parameters: dict[str, Any], wait_for: Iterable[PrefectFuture[Any]] | None = None, dependencies: dict[str, set[RunInput]] | None = None) -> PrefectConcurrentFuture[R]

submit

submit(self, task: 'Task[Any, R]', parameters: dict[str, Any], wait_for: Iterable[PrefectFuture[Any]] | None = None, dependencies: dict[str, set[RunInput]] | None = None) -> PrefectConcurrentFuture[R]

submit

submit(self, task: 'Task[P, R | CoroutineType[Any, Any, R]]', parameters: dict[str, Any], wait_for: Iterable[PrefectFuture[Any]] | None = None, dependencies: dict[str, set[RunInput]] | None = None) -> PrefectConcurrentFuture[R]
Submit a task to the task run engine running in a separate process. Args:
  • task: The task to submit.
  • parameters: The parameters to use when running the task.
  • wait_for: A list of futures that the task depends on.
  • dependencies: A dictionary of dependencies for the task.
Returns:
  • A future object that can be used to wait for the task to complete and
  • retrieve the result.

PrefectTaskRunner

Methods:

duplicate

duplicate(self) -> 'PrefectTaskRunner[R]'

map

map(self, task: 'Task[P, CoroutineType[Any, Any, R]]', parameters: dict[str, Any], wait_for: Iterable[PrefectFuture[Any]] | None = None) -> PrefectFutureList[PrefectDistributedFuture[R]]

map

map(self, task: 'Task[Any, R]', parameters: dict[str, Any], wait_for: Iterable[PrefectFuture[Any]] | None = None) -> PrefectFutureList[PrefectDistributedFuture[R]]

map

map(self, task: 'Task[P, R | CoroutineType[Any, Any, R]]', parameters: dict[str, Any], wait_for: Iterable[PrefectFuture[Any]] | None = None) -> PrefectFutureList[PrefectDistributedFuture[R]]

submit

submit(self, task: 'Task[P, CoroutineType[Any, Any, R]]', parameters: dict[str, Any], wait_for: Iterable[PrefectFuture[Any]] | None = None, dependencies: dict[str, set[RunInput]] | None = None) -> PrefectDistributedFuture[R]

submit

submit(self, task: 'Task[Any, R]', parameters: dict[str, Any], wait_for: Iterable[PrefectFuture[Any]] | None = None, dependencies: dict[str, set[RunInput]] | None = None) -> PrefectDistributedFuture[R]

submit

submit(self, task: 'Task[P, R | CoroutineType[Any, Any, R]]', parameters: dict[str, Any], wait_for: Iterable[PrefectFuture[Any]] | None = None, dependencies: dict[str, set[RunInput]] | None = None) -> PrefectDistributedFuture[R]
Submit a task to the task run engine running in a separate thread. Args:
  • task: The task to submit.
  • parameters: The parameters to use when running the task.
  • wait_for: A list of futures that the task depends on.
Returns:
  • A future object that can be used to wait for the task to complete and
  • retrieve the result.