Dask can run your tasks in parallel and distribute them over multiple machines. The prefect-dask integration makes it easy to accelerate your flow runs with Dask.

Getting started

Install prefect-dask

The following command will install a version of prefect-dask compatible with your installed version of prefect. If you don’t already have prefect installed, it will install the newest version of prefect as well.

pip install "prefect[dask]"

Upgrade to the latest versions of prefect and prefect-dask:

pip install -U "prefect[dask]"

Why use Dask?

Say your flow downloads many images to train a machine learning model. It takes longer than you’d like for the flow to run because it executes sequentially.

To accelerate your flow code, parallelize it with prefect-dask in three steps:

  1. Add the import: from prefect_dask import DaskTaskRunner
  2. Specify the task runner in the flow decorator: @flow(task_runner=DaskTaskRunner)
  3. Submit tasks to the flow’s task runner: a_task.submit(*args, **kwargs)

Below is code with and without the DaskTaskRunner:

# Completed in 15.2 seconds

from typing import List
from pathlib import Path

import httpx
from prefect import flow, task

URL_FORMAT = (
    "https://www.cpc.ncep.noaa.gov/products/NMME/archive/"
    "{year:04d}{month:02d}0800/current/images/nino34.rescaling.ENSMEAN.png"
)

@task
def download_image(year: int, month: int, directory: Path) -> Path:
    # download image from URL
    url = URL_FORMAT.format(year=year, month=month)
    resp = httpx.get(url)

    # save content to directory/YYYYMM.png
    file_path = (directory / url.split("/")[-1]).with_stem(f"{year:04d}{month:02d}")
    file_path.write_bytes(resp.content)
    return file_path

@flow
def download_nino_34_plumes_from_year(year: int) -> List[Path]:
    # create a directory to hold images
    directory = Path("data")
    directory.mkdir(exist_ok=True)

    # download all images
    file_paths = []
    for month in range(1, 12 + 1):
        file_path = download_image(year, month, directory)
        file_paths.append(file_path)
    return file_paths

if __name__ == "__main__":
    download_nino_34_plumes_from_year(2022)

In our tests, the flow run took 15.2 seconds to execute sequentially. Using the DaskTaskRunner reduced the runtime to 5.7 seconds!

Run tasks on Dask

The DaskTaskRunner is a task runner that submits tasks to the dask.distributed scheduler. By default, when the DaskTaskRunner is specified for a flow run, a temporary Dask cluster is created and used for the duration of the flow run.

If you already have a Dask cluster running, either cloud-hosted or local, you can provide the connection URL with the address kwarg.

DaskTaskRunner accepts the following optional parameters:

ParameterDescription
addressAddress of a currently running Dask scheduler.
cluster_classThe cluster class to use when creating a temporary Dask cluster. It can be either the full class name (for example, "distributed.LocalCluster"), or the class itself.
cluster_kwargsAdditional kwargs to pass to the cluster_class when creating a temporary Dask cluster.
adapt_kwargsAdditional kwargs to pass to cluster.adapt when creating a temporary Dask cluster. Note that adaptive scaling is only enabled if adapt_kwargs are provided.
client_kwargsAdditional kwargs to use when creating a dask.distributed.Client.

Multiprocessing safety

Because the DaskTaskRunner uses multiprocessing, calls to flows in scripts must be guarded with if __name__ == "__main__": or you will encounter warnings and errors.

If you don’t provide the address of a Dask scheduler, Prefect creates a temporary local cluster automatically. The number of workers used is based on the number of cores on your machine. The default provides a mix of processes and threads that work well for most workloads. To specify this explicitly, pass values for n_workers or threads_per_worker to cluster_kwargs:

from prefect_dask import DaskTaskRunner

# Use 4 worker processes, each with 2 threads
DaskTaskRunner(
    cluster_kwargs={"n_workers": 4, "threads_per_worker": 2}
)

Use a temporary cluster

The DaskTaskRunner can create a temporary cluster using any of Dask’s cluster-manager options. This is useful when you want each flow run to have its own Dask cluster, allowing for per-flow adaptive scaling. To configure it, provide a cluster_class. This can be:

  • A string specifying the import path to the cluster class (for example, "dask_cloudprovider.aws.FargateCluster")
  • The cluster class itself
  • A function for creating a custom cluster

You can also configure cluster_kwargs. This takes a dictionary of keyword arguments to pass to cluster_class when starting the flow run.

For example, to configure a flow to use a temporary dask_cloudprovider.aws.FargateCluster with four workers running with an image named my-prefect-image:

from prefect_dask import DaskTaskRunner

DaskTaskRunner(
    cluster_class="dask_cloudprovider.aws.FargateCluster",
    cluster_kwargs={"n_workers": 4, "image": "my-prefect-image"},
)

For larger workloads, you can accelerate execution further by distributing task runs over multiple machines.

Connect to an existing cluster

Multiple Prefect flow runs can use the same existing Dask cluster. You might manage a single long-running Dask cluster (for example, using the Dask Helm Chart) and configure flows to connect to it during execution. This has disadvantages compared to using a temporary Dask cluster:

  • All workers in the cluster must have dependencies installed for all flows you intend to run.
  • Multiple flow runs may compete for resources. Dask tries to do a good job sharing resources between tasks, but you may still run into issues.

Still, you may prefer managing a single long-running Dask cluster.

To configure a DaskTaskRunner to connect to an existing cluster, pass in the address of the scheduler to the address argument:

from prefect_dask import DaskTaskRunner

@flow(task_runner=DaskTaskRunner(address="http://my-dask-cluster"))
def my_flow():
    ...

Suppose you have an existing Dask client/cluster such as a dask.dataframe.DataFrame. With prefect-dask, it takes just a few steps:

  1. Add imports
  2. Add task and flow decorators
  3. Use get_dask_client context manager to distribute work across Dask workers
  4. Specify the task runner and client’s address in the flow decorator
  5. Submit the tasks to the flow’s task runner
import dask.dataframe
import dask.distributed


client = dask.distributed.Client()

def read_data(start: str, end: str) -> dask.dataframe.DataFrame:
    df = dask.datasets.timeseries(start, end, partition_freq="4w")
    return df


def process_data(df: dask.dataframe.DataFrame) -> dask.dataframe.DataFrame:

    df_yearly_avg = df.groupby(df.index.year).mean()
    return df_yearly_avg.compute()


def dask_pipeline():
    df = read_data("1988", "2022")
    df_yearly_average = process_data(df)
    return df_yearly_average


if __name__ == "__main__":
    dask_pipeline()

Configure adaptive scaling

A key feature of using a DaskTaskRunner is the ability to scale adaptively to the workload. Instead of specifying n_workers as a fixed number, you can specify a minimum and maximum number of workers to use, and the Dask cluster scales up and down as needed.

To do this, pass adapt_kwargs to DaskTaskRunner. This takes the following fields:

  • maximum (int or None, optional): the maximum number of workers to scale to. Set to None for no maximum.
  • minimum (int or None, optional): the minimum number of workers to scale to. Set to None for no minimum.

For example, this configures a flow to run on a FargateCluster scaling up to a maximum of 10 workers:

from prefect_dask import DaskTaskRunner

DaskTaskRunner(
    cluster_class="dask_cloudprovider.aws.FargateCluster",
    adapt_kwargs={"maximum": 10}
)

Use Dask annotations

Use Dask annotations to further control the behavior of tasks. For example, set the priority of tasks in the Dask scheduler:

import dask
from prefect import flow, task
from prefect_dask.task_runners import DaskTaskRunner


@task
def show(x):
    print(x)


@flow(task_runner=DaskTaskRunner())
def my_flow():
    with dask.annotate(priority=-10):
        future = show.submit(1)  # low priority task

    with dask.annotate(priority=10):
        future = show.submit(2)  # high priority task

Another common use case is resource annotations:

import dask
from prefect import flow, task
from prefect_dask.task_runners import DaskTaskRunner


@task
def show(x):
    print(x)

# Create a `LocalCluster` with some resource annotations
# Annotations are abstract in dask and not inferred from your system.
# Here, we claim that our system has 1 GPU and 1 process available per worker
@flow(
    task_runner=DaskTaskRunner(
        cluster_kwargs={"n_workers": 1, "resources": {"GPU": 1, "process": 1}}
    )
)

def my_flow():
    with dask.annotate(resources={'GPU': 1}):
        future = show(0)  # this task requires 1 GPU resource on a worker

    with dask.annotate(resources={'process': 1}):
        # These tasks each require 1 process on a worker; because we've
        # specified that our cluster has 1 process per worker and 1 worker,
        # these tasks will run sequentially
        future = show(1)
        future = show(2)
        future = show(3)


if __name__ == "__main__":
    my_flow()

Additional Resources

Refer to the prefect-dask SDK documentation linked in the sidebar to explore all the capabilities of the prefect-dask library.

For assistance using Dask, consult the Dask documentation