Tasks can be called directly (i.e. using __call__), but they will not run concurrently.
Copy
import timefrom prefect import task@taskdef stop_at_floor(floor: int): print(f"elevator moving to floor {floor}") time.sleep(floor) print(f"elevator stops on floor {floor}")stop_at_floor(0.1) # blocks for 0.1 secondsstop_at_floor(0.2) # blocks for 0.2 secondsstop_at_floor(0.3) # blocks for 0.3 seconds
When you call a task directly, it will block the main thread until the task completes.Continue reading to learn how to run tasks concurrently via task runners.
Tasks in your workflows can be run concurrently using the .submit() method.
Copy
import timefrom prefect import flow, taskfrom prefect.futures import wait@taskdef stop_at_floor(floor: int): print(f"elevator moving to floor {floor}") time.sleep(floor) print(f"elevator stops on floor {floor}")@flowdef elevator(): floors = [] for floor in range(10, 0, -1): floors.append(stop_at_floor.submit(floor)) wait(floors)
By default, tasks are submitted via the ThreadPoolTaskRunner, which runs tasks concurrently in a thread pool.
A common mistake is to pass non-thread-safe objects to tasks that you submit to a task runner. Avoid passing non-thread-safe objects to tasks you intend to run concurrently.See Design Considerations for more information.
When you submit a task, you’ll receive a PrefectFuture object back which you can use to track the task’s execution. Use the .result() method to get the result of the task.
Copy
from prefect import task, flow@taskdef cool_task(): return "sup"@flowdef my_workflow(): future = cool_task.submit() result = future.result() print(result)
If you don’t need to the result of the task, you can use the .wait() method to wait for execution to complete.
To wait for multiple futures to complete, use the wait() utility.
Copy
from prefect import task, flowfrom prefect.futures import wait@taskdef cool_task(): return "sup"@flowdef my_workflow(): futures = [cool_task.submit() for _ in range(10)] wait(futures)
Passing futures between tasksIf you pass PrefectFuture objects between tasks or flows, the task or flow receiving the future will wait for the future to complete before starting execution.
Copy
from prefect import task, flow@taskdef cool_task(): return "sup"@taskdef what_did_cool_task_say(what_it_said: str): return f"cool task said {what_it_said}"@flowdef my_workflow(): future = cool_task.submit() print(what_did_cool_task_say(future))
Sometimes you may not want to map a task over a certain input value.By default, non-iterable values will not be mapped over (so unmapped is not required):
When using .map as in the above example, the result of the task is a list of futures.You can wait for or retrieve the results from these futures with wait or result methods:
To model more complex concurrent workflows, you can map tasks within other tasks:
Copy
import refrom prefect import flow, taskfrom prefect.futures import waitdef count_words(text: str) -> int: """Count the number of words in a text.""" return len(text.split())def extract_emails(text: str) -> list[str]: return re.findall(r"[\w.+-]+@[\w-]+\.[\w.-]+", text)@taskdef analyze_texts(texts: list[str]) -> dict[str, list[int | list[str]]]: futures = { op.__name__: task(op).map(texts) for op in [count_words, extract_emails] } wait([f for futs in futures.values() for f in futs]) return {name: [f.result() for f in futs] for name, futs in futures.items()}@flowdef run_text_analysis(): """Analyze a batch of social media posts with multiple operations.""" results = analyze_texts( texts=[ "Just visited #Paris! Contact me at visitor@example.com #travel #vacation", "Working on my new #project. Reach out at developer@example.com if interested!", "Happy to announce our company event #celebration #milestone email: events@company.org", ] ) print("\nAnalysis Results:") print(f" Word counts: {results['count_words']}") print(f" Extracted emails: {results['extract_emails']}\n") return resultsrun_text_analysis()
Output
Copy
00:03:45.159 | INFO | Flow run 'hilarious-collie' - Beginning flow run 'hilarious-collie' for flow 'run-text-analysis'00:03:45.233 | INFO | Task run 'count_words-01a' - Finished in state Completed()00:03:45.236 | INFO | Task run 'extract_emails-7a7' - Finished in state Completed()00:03:45.237 | INFO | Task run 'extract_emails-ca2' - Finished in state Completed()00:03:45.239 | INFO | Task run 'count_words-01d' - Finished in state Completed()00:03:45.240 | INFO | Task run 'count_words-f0a' - Finished in state Completed()00:03:45.242 | INFO | Task run 'extract_emails-0c6' - Finished in state Completed()00:03:45.247 | INFO | Task run 'analyze_texts-53b' - Finished in state Completed()Analysis Results: Word counts: [9, 11, 10] Extracted emails: [['visitor@example.com'], ['developer@example.com'], ['events@company.org']]00:03:45.491 | INFO | Flow run 'hilarious-collie' - Finished in state Completed()
This pattern is useful when you need to:
Process combinations of parameters concurrently
Apply multiple transformations to multiple datasets
Create a grid of operations where each cell is an independent task
You may also use the wait_for=[] parameter
when calling a task by specifying upstream task dependencies. This enables you to control task execution
order for tasks that do not share data dependencies.
Copy
from prefect import flow, task@taskdef task_a(): pass@taskdef task_b(): pass@taskdef task_c(): pass@taskdef task_d(): pass@flowdef my_flow(): a = task_a.submit() b = task_b.submit() # Wait for task_a and task_b to complete c = task_c.submit(wait_for=[a, b]) # task_d will wait for task_c to complete # Note: If waiting for one task it must still be in a list. d = task_d(wait_for=[c])
Instead of failing your entire workflow when a single concurrent task fails, you may want to run concurrent work to completion, even if some tasks fail. To run tasks concurrently and handle failures afterwards, use the wait() utility along with the .state attribute on futures:
Copy
from typing import Anyfrom prefect import flow, taskfrom prefect.futures import waitfrom prefect.states import State@taskdef process_data(item: int) -> str: if item < 0: raise ValueError(f"Cannot process negative value: {item}") return f"Processed {item}"@flowdef batch_processing(): items = [1, -2, 3, -4, 5] # Submit all tasks and return the futures futures = process_data.map(items) # Wait for all futures to complete concurrently done, not_done = wait(futures) # Check each future's state successful: list[Any] = [] failed: list[State] = [] for future in done: if future.state.is_completed(): successful.append(future.result()) else: failed.append(future.state) print(f"Processed {len(successful)} items successfully") print(f"Failed to process {len(failed)} items") return successful
The wait() function returns two sets of futures: done and not_done. Each future has a .state attribute you can inspect to determine how the associated task run’s result (or exception) should be treated downstream.
The “right way” to handle failures will depend on your use case. Review the final state determination documentation for more information.
00:56:47.873 | INFO | Flow run 'precious-walrus' - Beginning flow run 'precious-walrus' for flow 'evaluate-models'00:56:47.981 | INFO | Task run 'train on customers with random_forest' - Finished in state Completed()00:56:47.984 | INFO | Task run 'train on orders with gradient_boosting' - Finished in state Completed()00:56:47.984 | INFO | Task run 'train on customers with gradient_boosting' - Finished in state Completed()00:56:47.985 | INFO | Task run 'train on products with random_forest' - Finished in state Completed()00:56:47.988 | INFO | Task run 'train on orders with random_forest' - Finished in state Completed()00:56:47.990 | INFO | Task run 'train on products with gradient_boosting' - Finished in state Completed()Best model: {'dataset': 'products', 'model': 'random_forest', 'score': 0.5603239415052655}00:56:48.121 | INFO | Flow run 'precious-walrus' - Finished in state Completed()