prefect-dask
integration makes it easy to accelerate your flow runs with Dask.
Getting started
Install prefect-dask
The following command will install a version of prefect-dask
compatible with your installed version of prefect
.
If you don’t already have prefect
installed, it will install the newest version of prefect
as well.
prefect
and prefect-dask
:
Why use Dask?
Say your flow downloads many images to train a machine learning model. It takes longer than you’d like for the flow to run because it executes sequentially. To accelerate your flow code, parallelize it withprefect-dask
in three steps:
- Add the import:
from prefect_dask import DaskTaskRunner
- Specify the task runner in the flow decorator:
@flow(task_runner=DaskTaskRunner)
- Submit tasks to the flow’s task runner:
a_task.submit(*args, **kwargs)
DaskTaskRunner
reduced the runtime to 5.7 seconds!
Run tasks on Dask
TheDaskTaskRunner
is a task runner that submits tasks to the dask.distributed
scheduler.
By default, when the DaskTaskRunner
is specified for a flow run, a temporary Dask cluster is created and used for the duration of the flow run.
If you already have a Dask cluster running, either cloud-hosted or local, you can provide the connection URL with the address
kwarg.
DaskTaskRunner
accepts the following optional parameters:
Parameter | Description |
---|---|
address | Address of a currently running Dask scheduler. |
cluster_class | The cluster class to use when creating a temporary Dask cluster. It can be either the full class name (for example, "distributed.LocalCluster" ), or the class itself. |
cluster_kwargs | Additional kwargs to pass to the cluster_class when creating a temporary Dask cluster. |
adapt_kwargs | Additional kwargs to pass to cluster.adapt when creating a temporary Dask cluster. Note that adaptive scaling is only enabled if adapt_kwargs are provided. |
client_kwargs | Additional kwargs to use when creating a dask.distributed.Client . |
Multiprocessing safetyBecause the
DaskTaskRunner
uses multiprocessing, calls to flows in scripts must be guarded with if __name__ == "__main__":
or you will encounter warnings and errors.address
of a Dask scheduler, Prefect creates a temporary local cluster automatically.
The number of workers used is based on the number of cores on your machine.
The default provides a mix of processes and threads that work well for most workloads.
To specify this explicitly, pass values for n_workers
or threads_per_worker
to cluster_kwargs
:
Use a temporary cluster
TheDaskTaskRunner
can create a temporary cluster using any of Dask’s cluster-manager options.
This is useful when you want each flow run to have its own Dask cluster, allowing for per-flow adaptive scaling.
To configure it, provide a cluster_class
.
This can be:
- A string specifying the import path to the cluster class (for example,
"dask_cloudprovider.aws.FargateCluster"
) - The cluster class itself
- A function for creating a custom cluster
cluster_kwargs
.
This takes a dictionary of keyword arguments to pass to cluster_class
when starting the flow run.
For example, to configure a flow to use a temporary dask_cloudprovider.aws.FargateCluster
with four workers running with an image named my-prefect-image
:
Connect to an existing cluster
Multiple Prefect flow runs can use the same existing Dask cluster. You might manage a single long-running Dask cluster (for example, using the Dask Helm Chart) and configure flows to connect to it during execution. This has disadvantages compared to using a temporary Dask cluster:- All workers in the cluster must have dependencies installed for all flows you intend to run.
- Multiple flow runs may compete for resources. Dask tries to do a good job sharing resources between tasks, but you may still run into issues.
DaskTaskRunner
to connect to an existing cluster, pass in the address of the scheduler to the address
argument:
dask.dataframe.DataFrame
.
With prefect-dask
, it takes just a few steps:
- Add imports
- Add
task
andflow
decorators - Use
get_dask_client
context manager to distribute work across Dask workers - Specify the task runner and client’s address in the flow decorator
- Submit the tasks to the flow’s task runner
Configure adaptive scaling
A key feature of using aDaskTaskRunner
is the ability to scale adaptively to the workload.
Instead of specifying n_workers
as a fixed number, you can specify a minimum and maximum number of workers to use, and the Dask cluster scales up and down as needed.
To do this, pass adapt_kwargs
to DaskTaskRunner
.
This takes the following fields:
maximum
(int
orNone
, optional): the maximum number of workers to scale to. Set toNone
for no maximum.minimum
(int
orNone
, optional): the minimum number of workers to scale to. Set toNone
for no minimum.
FargateCluster
scaling up to a maximum of 10 workers:
Use Dask annotations
Use Dask annotations to further control the behavior of tasks. For example, set the priority of tasks in the Dask scheduler:Additional Resources
Refer to theprefect-dask
SDK documentation to explore all the capabilities of the prefect-dask
library.
For assistance using Dask, consult the Dask documentation
Resolving futures in sync clientNote, by default, For more information, visit the docs on Waiting on Futures.
dask_collection.compute()
returns concrete values while client.compute(dask_collection)
returns Dask Futures. Therefore, if you call client.compute
, you must resolve all futures before exiting out of the context manager by either:- setting
sync=True
- calling
result()
get_async_dask_client
. When using the async client, you must await client.compute(dask_collection)
before exiting the context manager.
Note that task submission (.submit()
) and future resolution (.result()
) are always synchronous operations in Prefect, even when working with async tasks and flows.