Calling tasks directly

Tasks can be called directly (i.e. using __call__), but they will not run concurrently.

import time

from prefect import task

@task
def stop_at_floor(floor: int):
    print(f"elevator moving to floor {floor}")
    time.sleep(floor)
    print(f"elevator stops on floor {floor}")

stop_at_floor(0.1) # blocks for 0.1 seconds
stop_at_floor(0.2) # blocks for 0.2 seconds
stop_at_floor(0.3) # blocks for 0.3 seconds

When you call a task directly, it will block the main thread until the task completes.

Continue reading to learn how to run tasks concurrently via task runners.

Using .submit

Tasks in your workflows can be run concurrently using the .submit() method.

import time

from prefect import flow, task
from prefect.futures import wait

@task
def stop_at_floor(floor: int):
    print(f"elevator moving to floor {floor}")
    time.sleep(floor)
    print(f"elevator stops on floor {floor}")

@flow
def elevator():
    floors = []
    for floor in range(10, 0, -1):
        floors.append(stop_at_floor.submit(floor))
    wait(floors)

By default, tasks are submitted via the ThreadPoolTaskRunner, which runs tasks concurrently in a thread pool.

A common mistake is to pass non-thread-safe objects to tasks that you submit to a task runner. Avoid passing non-thread-safe objects to tasks you intend to run concurrently.

See Design Considerations for more information.

Using a different task runner

To run submitted tasks with a different task runner, you can pass a task_runner argument to the @flow decorator.

To run this example, you’ll need to install the prefect[dask] extra

import time

from prefect import flow, task
from prefect.futures import wait
from prefect_dask.task_runners import DaskTaskRunner

@task
def stop_at_floor(floor: int):
    print(f"elevator moving to floor {floor}")
    time.sleep(floor)
    print(f"elevator stops on floor {floor}")

@flow(task_runner=DaskTaskRunner())
def elevator():
    floors = []
    for floor in range(10, 0, -1):
        floors.append(stop_at_floor.submit(floor))
    wait(floors)

Handling futures

When you submit a task, you’ll receive a PrefectFuture object back which you can use to track the task’s execution. Use the .result() method to get the result of the task.

from prefect import task, flow

@task
def cool_task():
    return "sup"
    
@flow
def my_workflow():
    future = cool_task.submit()
    result = future.result()
    print(result)

If you don’t need to the result of the task, you can use the .wait() method to wait for execution to complete.

from prefect import task, flow

@task
def cool_task():
    return "sup"
    
@flow
def my_workflow():
    future = cool_task.submit()
    future.wait()

To wait for multiple futures to complete, use the wait() utility.

from prefect import task, flow
from prefect.futures import wait


@task
def cool_task():
    return "sup"
    
    
@flow
def my_workflow():
    futures = [cool_task.submit() for _ in range(10)]
    wait(futures)

Passing futures between tasks

If you pass PrefectFuture objects between tasks or flows, the task or flow receiving the future will wait for the future to complete before starting execution.

from prefect import task, flow

@task
def cool_task():
    return "sup"


@task
def what_did_cool_task_say(what_it_said: str):
    return f"cool task said {what_it_said}"

@flow
def my_workflow():
    future = cool_task.submit()
    print(what_did_cool_task_say(future))

Using .map

For a convenient way to iteratively submit tasks, use the .map() method.

import time

from prefect import flow, task
from prefect.futures import wait

@task
def stop_at_floor(floor: int):
    print(f"elevator moving to floor {floor}")
    time.sleep(floor)
    print(f"elevator stops on floor {floor}")

@flow
def elevator():
    floors = list(range(10, 0, -1))
    floors = stop_at_floor.map(floors)
    wait(floors)

Like .submit, .map uses the task runner configured on the parent flow. Changing the task runner will change where mapped tasks are executed.

Using the unmapped annotation

Sometimes you may not want to map a task over a certain input value.

By default, non-iterable values will not be mapped over (so unmapped is not required):

from prefect import flow, task

@task
def add_together(x, y):
    return x + y

@flow
def sum_it(numbers: list[int], static_value: int):
    futures = add_together.map(numbers, static_value)
    return futures.result()

resulting_sum = sum_it([1, 2, 3], 5)
assert resulting_sum == [6, 7, 8]

… but if your argument is an iterable type, wrap it with unmapped to tell .map to treat it as static:

from prefect import flow, task, unmapped

@task
def sum_plus(x, static_iterable):
    return x + sum(static_iterable)

@flow
def sum_it(numbers, static_iterable):
    futures = sum_plus.map(numbers, unmapped(static_iterable))
    return futures.result()

resulting_sum = sum_it([4, 5, 6], [1, 2, 3])
assert resulting_sum == [10, 11, 12]

Bulk PrefectFuture operations

When using .map as in the above example, the result of the task is a list of futures.

You can wait for or retrieve the results from these futures with wait or result methods:

futures = some_task.map(some_iterable)
results = futures.result()

which is syntactic sugar for the corresponding list comprehension:

futures = some_task.map(some_iterable)
results = [future.result() for future in futures]

Nested mapped tasks

To model more complex concurrent workflows, you can map tasks within other tasks:

import re

from prefect import flow, task
from prefect.futures import wait

def count_words(text: str) -> int:
    """Count the number of words in a text."""
    return len(text.split())

def extract_emails(text: str) -> list[str]:
    return re.findall(r"[\w.+-]+@[\w-]+\.[\w.-]+", text)

@task
def analyze_texts(texts: list[str]) -> dict[str, list[int | list[str]]]:
    futures = {
        op.__name__: task(op).map(texts) for op in [count_words, extract_emails]
    }
    wait([f for futs in futures.values() for f in futs])
    return {name: [f.result() for f in futs] for name, futs in futures.items()}

@flow
def run_text_analysis():
    """Analyze a batch of social media posts with multiple operations."""
    results = analyze_texts(
        texts=[
            "Just visited #Paris! Contact me at visitor@example.com #travel #vacation",
            "Working on my new #project. Reach out at developer@example.com if interested!",
            "Happy to announce our company event #celebration #milestone email: events@company.org",
        ]
    )
    print("\nAnalysis Results:")
    print(f"  Word counts: {results['count_words']}")
    print(f"  Extracted emails: {results['extract_emails']}\n")
    return results

run_text_analysis()

This pattern is useful when you need to:

  1. Process combinations of parameters concurrently
  2. Apply multiple transformations to multiple datasets
  3. Create a grid of operations where each cell is an independent task

Creating state dependencies

You may also use the wait_for=[] parameter when calling a task by specifying upstream task dependencies. This enables you to control task execution order for tasks that do not share data dependencies.

from prefect import flow, task
@task
def task_a():
    pass

@task
def task_b():
    pass

@task
def task_c():
    pass
    
@task
def task_d():
    pass

@flow
def my_flow():
    a = task_a.submit()
    b = task_b.submit()
    # Wait for task_a and task_b to complete
    c = task_c.submit(wait_for=[a, b])
    # task_d will wait for task_c to complete
    # Note: If waiting for one task it must still be in a list.
    d = task_d(wait_for=[c])

Using asyncio

If you have tasks are defined as async functions, you can use asyncio from the Python standard library to run them concurrently.

import asyncio

from prefect import flow, task

@task
async def stop_at_floor(floor: int):
    print(f"elevator moving to floor {floor}")
    await asyncio.sleep(floor)
    print(f"elevator stops on floor {floor}")

@flow
async def elevator():
    floors = list(range(10, 0, -1))
    await asyncio.gather(*[stop_at_floor(floor) for floor in floors])


if __name__ == "__main__":
    asyncio.run(elevator())

Real-world applications

Mapped tasks are particularly valuable in common data science and ETL workflows such as:

  1. Machine learning model evaluation: Train multiple models on multiple datasets concurrently
  2. ETL pipelines: Process multiple data sources with multiple transformations
  3. API data enrichment: Enrich multiple records with data from multiple external services

For example, imagine you want to find the best training configuration for a series of datasets, and you want to process all datasets concurrently:

import random
from dataclasses import dataclass

from prefect import flow, task
from prefect.futures import PrefectFuture, wait

@dataclass
class Dataset:
    name: str

@dataclass
class ModelConfig:
    name: str

@task(task_run_name="train on {dataset.name} with {model_config.name}")
def train_model(dataset: Dataset, model_config: ModelConfig) -> dict:
    return {
        "dataset": dataset.name,
        "model": model_config.name,
        "score": random.random(),
    }

@flow
def evaluate_models(datasets: list[Dataset], model_configs: list[ModelConfig]):
    all_futures: list[PrefectFuture[dict[str, object]]] = []
    for dataset in datasets:
        futures = train_model.map(
            dataset=dataset,
            model_config=model_configs,
        )
        all_futures.extend(futures)

    results = [future.result() for future in wait(all_futures).done]

    print(f"\nBest model: {max(results, key=lambda r: r['score'])}")

evaluate_models(
    datasets=[
        Dataset("customers"), Dataset("products"), Dataset("orders")
    ],
    model_configs=[
        ModelConfig("random_forest"), ModelConfig("gradient_boosting")
    ],
)