Tasks in your workflows can be run concurrently using the .submit() method.
import timefrom prefect import flow, taskfrom prefect.futures import wait@taskdef stop_at_floor(floor: int): print(f"elevator moving to floor {floor}") time.sleep(floor) print(f"elevator stops on floor {floor}")@flowdef elevator(): floors = [] for floor in range(10, 0, -1): floors.append(stop_at_floor.submit(floor)) wait(floors)
By default, tasks are submitted via the ThreadPoolTaskRunner, which runs tasks concurrently in a thread pool.
A common mistake is to pass non-thread-safe objects to tasks that you submit to a task runner. Avoid passing non-thread-safe objects to tasks you intend to run concurrently.
When you submit a task, you’ll receive a PrefectFuture object back which you can use to track the task’s execution. Use the .result() method to get the result of the task.
from prefect import task, flow@taskdef cool_task(): return "sup"@flowdef my_workflow(): future = cool_task.submit() result = future.result() print(result)
If you don’t need to the result of the task, you can use the .wait() method to wait for execution to complete.
To wait for multiple futures to complete, use the wait() utility.
from prefect import task, flowfrom prefect.futures import wait@taskdef cool_task(): return "sup"@flowdef my_workflow(): futures = [cool_task.submit() for _ in range(10)] wait(futures)
Passing futures between tasks
If you pass PrefectFuture objects between tasks or flows, the task or flow receiving the future will wait for the future to complete before starting execution.
from prefect import task, flow@taskdef cool_task(): return "sup"@taskdef what_did_cool_task_say(what_it_said: str): return f"cool task said {what_it_said}"@flowdef my_workflow(): future = cool_task.submit() print(what_did_cool_task_say(future))
To model more complex concurrent workflows, you can map tasks within other tasks:
import refrom prefect import flow, taskfrom prefect.futures import waitdef count_words(text: str) -> int: """Count the number of words in a text.""" return len(text.split())def extract_emails(text: str) -> list[str]: return re.findall(r"[\w.+-]+@[\w-]+\.[\w.-]+", text)@taskdef analyze_texts(texts: list[str]) -> dict[str, list[int | list[str]]]: futures = { op.__name__: task(op).map(texts) for op in [count_words, extract_emails] } wait([f for futs in futures.values() for f in futs]) return {name: [f.result() for f in futs] for name, futs in futures.items()}@flowdef run_text_analysis(): """Analyze a batch of social media posts with multiple operations.""" results = analyze_texts( texts=[ "Just visited #Paris! Contact me at visitor@example.com #travel #vacation", "Working on my new #project. Reach out at developer@example.com if interested!", "Happy to announce our company event #celebration #milestone email: events@company.org", ] ) print("\nAnalysis Results:") print(f" Word counts: {results['count_words']}") print(f" Extracted emails: {results['extract_emails']}\n") return resultsrun_text_analysis()
00:03:45.159 | INFO | Flow run 'hilarious-collie' - Beginning flow run 'hilarious-collie' for flow 'run-text-analysis'00:03:45.233 | INFO | Task run 'count_words-01a' - Finished in state Completed()00:03:45.236 | INFO | Task run 'extract_emails-7a7' - Finished in state Completed()00:03:45.237 | INFO | Task run 'extract_emails-ca2' - Finished in state Completed()00:03:45.239 | INFO | Task run 'count_words-01d' - Finished in state Completed()00:03:45.240 | INFO | Task run 'count_words-f0a' - Finished in state Completed()00:03:45.242 | INFO | Task run 'extract_emails-0c6' - Finished in state Completed()00:03:45.247 | INFO | Task run 'analyze_texts-53b' - Finished in state Completed()Analysis Results: Word counts: [9, 11, 10] Extracted emails: [['visitor@example.com'], ['developer@example.com'], ['events@company.org']]00:03:45.491 | INFO | Flow run 'hilarious-collie' - Finished in state Completed()
This pattern is useful when you need to:
Process combinations of parameters concurrently
Apply multiple transformations to multiple datasets
Create a grid of operations where each cell is an independent task
You may also use the wait_for=[] parameter
when calling a task by specifying upstream task dependencies. This enables you to control task execution
order for tasks that do not share data dependencies.
from prefect import flow, task@taskdef task_a(): pass@taskdef task_b(): pass@taskdef task_c(): pass@taskdef task_d(): pass@flowdef my_flow(): a = task_a.submit() b = task_b.submit() # Wait for task_a and task_b to complete c = task_c.submit(wait_for=[a, b]) # task_d will wait for task_c to complete # Note: If waiting for one task it must still be in a list. d = task_d(wait_for=[c])
00:56:47.873 | INFO | Flow run 'precious-walrus' - Beginning flow run 'precious-walrus' for flow 'evaluate-models'00:56:47.981 | INFO | Task run 'train on customers with random_forest' - Finished in state Completed()00:56:47.984 | INFO | Task run 'train on orders with gradient_boosting' - Finished in state Completed()00:56:47.984 | INFO | Task run 'train on customers with gradient_boosting' - Finished in state Completed()00:56:47.985 | INFO | Task run 'train on products with random_forest' - Finished in state Completed()00:56:47.988 | INFO | Task run 'train on orders with random_forest' - Finished in state Completed()00:56:47.990 | INFO | Task run 'train on products with gradient_boosting' - Finished in state Completed()Best model: {'dataset': 'products', 'model': 'random_forest', 'score': 0.5603239415052655}00:56:48.121 | INFO | Flow run 'precious-walrus' - Finished in state Completed()