prefect-dask integration makes it easy to accelerate your flow runs with Dask.
Getting started
Install prefect-dask
The following command will install a version of prefect-dask compatible with your installed version of prefect.
If you don’t already have prefect installed, it will install the newest version of prefect as well.
prefect and prefect-dask:
Why use Dask?
Say your flow downloads many images to train a machine learning model. It takes longer than you’d like for the flow to run because it executes sequentially. To accelerate your flow code, parallelize it withprefect-dask in three steps:
- Add the import: from prefect_dask import DaskTaskRunner
- Specify the task runner in the flow decorator: @flow(task_runner=DaskTaskRunner)
- Submit tasks to the flow’s task runner: a_task.submit(*args, **kwargs)
- Sequential by default
- Parallel with Dask
DaskTaskRunner reduced the runtime to 5.7 seconds!
Run tasks on Dask
TheDaskTaskRunner is a task runner that submits tasks to the dask.distributed scheduler.
By default, when the DaskTaskRunner is specified for a flow run, a temporary Dask cluster is created and used for the duration of the flow run.
If you already have a Dask cluster running, either cloud-hosted or local, you can provide the connection URL with the address kwarg.
DaskTaskRunner accepts the following optional parameters:
| Parameter | Description | 
|---|---|
| address | Address of a currently running Dask scheduler. | 
| cluster_class | The cluster class to use when creating a temporary Dask cluster. It can be either the full class name (for example, "distributed.LocalCluster"), or the class itself. | 
| cluster_kwargs | Additional kwargs to pass to the cluster_classwhen creating a temporary Dask cluster. | 
| adapt_kwargs | Additional kwargs to pass to cluster.adaptwhen creating a temporary Dask cluster. Note that adaptive scaling is only enabled ifadapt_kwargsare provided. | 
| client_kwargs | Additional kwargs to use when creating a dask.distributed.Client. | 
Multiprocessing safetyBecause the 
DaskTaskRunner uses multiprocessing, calls to flows in scripts must be guarded with if __name__ == "__main__": or you will encounter warnings and errors.address of a Dask scheduler, Prefect creates a temporary local cluster automatically.
The number of workers used is based on the number of cores on your machine.
The default provides a mix of processes and threads that work well for most workloads.
To specify this explicitly, pass values for n_workers or threads_per_worker to cluster_kwargs:
Use a temporary cluster
TheDaskTaskRunner can create a temporary cluster using any of Dask’s cluster-manager options.
This is useful when you want each flow run to have its own Dask cluster, allowing for per-flow adaptive scaling.
To configure it, provide a cluster_class.
This can be:
- A string specifying the import path to the cluster class (for example, "dask_cloudprovider.aws.FargateCluster")
- The cluster class itself
- A function for creating a custom cluster
cluster_kwargs.
This takes a dictionary of keyword arguments to pass to cluster_class when starting the flow run.
For example, to configure a flow to use a temporary dask_cloudprovider.aws.FargateCluster with four workers running with an image named my-prefect-image:
Connect to an existing cluster
Multiple Prefect flow runs can use the same existing Dask cluster. You might manage a single long-running Dask cluster (for example, using the Dask Helm Chart) and configure flows to connect to it during execution. This has disadvantages compared to using a temporary Dask cluster:- All workers in the cluster must have dependencies installed for all flows you intend to run.
- Multiple flow runs may compete for resources. Dask tries to do a good job sharing resources between tasks, but you may still run into issues.
DaskTaskRunner to connect to an existing cluster, pass in the address of the scheduler to the address argument:
dask.dataframe.DataFrame.
With prefect-dask, it takes just a few steps:
- Add imports
- Add taskandflowdecorators
- Use get_dask_clientcontext manager to distribute work across Dask workers
- Specify the task runner and client’s address in the flow decorator
- Submit the tasks to the flow’s task runner
- Without Prefect
- With Prefect
Configure adaptive scaling
A key feature of using aDaskTaskRunner is the ability to scale adaptively to the workload.
Instead of specifying n_workers as a fixed number, you can specify a minimum and maximum number of workers to use, and the Dask cluster scales up and down as needed.
To do this, pass adapt_kwargs to DaskTaskRunner.
This takes the following fields:
- maximum(- intor- None, optional): the maximum number of workers to scale to. Set to- Nonefor no maximum.
- minimum(- intor- None, optional): the minimum number of workers to scale to. Set to- Nonefor no minimum.
FargateCluster scaling up to a maximum of 10 workers:
Use Dask annotations
Use Dask annotations to further control the behavior of tasks. For example, set the priority of tasks in the Dask scheduler:Additional Resources
Refer to theprefect-dask SDK documentation to explore all the capabilities of the prefect-dask library.
For assistance using Dask, consult the Dask documentation
Resolving futures in sync clientNote, by default, For more information, visit the docs on Waiting on Futures.
dask_collection.compute() returns concrete values while client.compute(dask_collection) returns Dask Futures. Therefore, if you call client.compute, you must resolve all futures before exiting out of the context manager by either:- setting sync=True
- calling result()
get_async_dask_client. When using the async client, you must await client.compute(dask_collection) before exiting the context manager.
Note that task submission (.submit()) and future resolution (.result()) are always synchronous operations in Prefect, even when working with async tasks and flows.