Upgrading from agents to workers significantly enhances the experience of deploying flows by simplifying the specification of each flow’s infrastructure and runtime environment.

This guide is for users who are on prefect<3.0 who are upgrading from agents to workers. If you are new to Prefect, we recommend starting with the Prefect Quickstart.

About workers and agents

A worker is the fusion of an agent with an infrastructure block. Like agents, workers poll a work pool for flow runs that are scheduled to start. Like infrastructure blocks, workers are typed. They work with only one kind of infrastructure, and they specify the default configuration for jobs submitted to that infrastructure.

Accordingly, workers are not a drop-in replacement for agents. Using workers requires deploying flows differently. In particular, deploying a flow with a worker does not involve specifying an infrastructure block. Instead, infrastructure configuration is specified on the work pool and passed to each worker that polls work from that pool.

Upgrade enhancements

Workers

  • Improved visibility into the status of each worker, including when a worker was started and when it last polled.
  • Better handling of race conditions for high availability use cases.

Work pools

  • Work pools allow greater customization and governance of infrastructure parameters for deployments through their base job template.
  • Prefect Cloud push work pools enable flow execution in your cloud provider environment without the need to host a worker.
  • Prefect Cloud managed work pools allow you to run flows on Prefect’s infrastructure, without the need to host a worker or configure cloud provider infrastructure.

Improved deployment interfaces

  • The Python deployment experience with .deploy() or the alternative deployment experience with prefect.yaml are more flexible and easier to use than block and agent-based deployments.
  • Both options allow you to deploy multiple flows with a single command.
  • Both options allow you to build Docker images for your flows to create portable execution environments.
  • The YAML-based API supports templating to enable dryer deployment definitions.

Upgrade changes

  1. Deployment CLI and Python SDK:

    prefect deployment build <entrypoint>/prefect deployment apply —> prefect deploy

    Prefect now automatically detects flows in your repo and provides a wizard
    to guide you through setting required attributes for your deployments.

    Deployment.build_from_flow —> flow.deploy

  2. Configuring remote flow code storage:

    storage blocks —> pull action

    When using the YAML-based deployment API, you can configure a pull action in your prefect.yaml file to specify how to retrieve flow code for your deployments. You can use configuration from your existing storage blocks to define your pull action through templating.

    When using the Python deployment API, you can pass any storage block to the flow.deploy method to specify how to retrieve flow code for your deployment.

  3. Configuring flow run infrastructure:

    infrastructure blocks —> typed work pool

    Default infrastructure config is now set on the typed work pool, and can be overwritten by individual deployments.

  4. Managing multiple deployments:

    Create and/or update many deployments at once through a prefect.yaml file or use the deploy function.

What’s similar

  • You can set storage blocks as the pull action in a prefect.yaml file.

  • Infrastructure blocks have configuration fields similar to typed work pools.

  • Deployment-level infrastructure overrides operate in much the same way.

    infra_override -> job_variable

  • The process for starting an agent and starting a worker in your environment are virtually identical.

    prefect agent start --pool <work pool name> —> prefect worker start --pool <work pool name>

Worker Helm chart

If you host your agents in a Kubernetes cluster, you can use the Prefect worker Helm chart to host workers in your cluster.

Upgrade steps

If you have existing deployments that use infrastructure blocks, you can quickly upgrade them to be compatible with workers by following these steps:

  1. Create a work pool

This new work pool replaces your infrastructure block.

You can use the .publish_as_work_pool method on any infrastructure block to create a work pool with the same configuration.

For example, if you have a KubernetesJob infrastructure block named ‘my-k8s-job’, you can create a work pool with the same configuration with this script:

from prefect.infrastructure import KubernetesJob


KubernetesJob.load("my-k8s-job").publish_as_work_pool()

Running this script creates a work pool named ‘my-k8s-job’ with the same configuration as your infrastructure block.

Serving flows If you are using a Process infrastructure block and a LocalFilesystem storage block (or aren’t using an infrastructure and storage block at all), you can use flow.serve to create a deployment without specifying a work pool name or start a worker.

This is a quick way to create a deployment for a flow and manage your deployments if you don’t need the dynamic infrastructure creation or configuration offered by workers.

  1. Start a worker

This worker replaces your agent and polls your new work pool for flow runs to execute.

prefect worker start -p <work pool name>
  1. Deploy your flows to the new work pool

To deploy your flows to the new work pool, use flow.deploy for a Pythonic deployment experience or prefect deploy for a YAML-based deployment experience.

If you currently use Deployment.build_from_flow, we recommend using flow.deploy.

If you currently use prefect deployment build and prefect deployment apply, we recommend using prefect deploy.

Use flow.deploy

If you have a Python script that uses Deployment.build_from_flow to create a deployment, you can replace it with flow.deploy.

You can translate most arguments to Deployment.build_from_flow directly to flow.deploy, but here are some possible changes you may need:

  • Replace infrastructure with work_pool_name.
    • If you’ve used the .publish_as_work_pool method on your infrastructure block, use the name of the created work pool.
  • Replace infra_overrides with job_variables.
  • Replace storage with a call to flow.from_source.
    • flow.from_source loads your flow from a remote storage location and makes it deployable. You can pass your existing storage block to the source argument of flow.from_source.

Below are some examples of how to translate Deployment.build_from_flow into flow.deploy.

Deploying from a local file

Using agents and Deployment.build_from_flow to deploy a flow from a local file looked like:

from prefect import flow


@flow(log_prints=True)
def my_flow(name: str = "world"):
    print(f"Hello {name}! I'm a flow from a Python script!")


if __name__ == "__main__":
    Deployment.build_from_flow(
        my_flow,
        name="my-deployment",
        parameters=dict(name="Marvin"),
    )

When using workers, you can accomplish the same local-storage deployment with flow.deploy:

example.py
from pathlib import Path
from prefect import flow


@flow(log_prints=True)
def my_flow(name: str = "world"):
    print(f"Hello {name}! I'm a flow from a Python script!")


if __name__ == "__main__":
    my_flow.from_source(
        source=str(Path(__file__).parent),
        entrypoint="example.py:my_flow",
    ).deploy(
        name="my-deployment",
        parameters=dict(name="Marvin"),
        work_pool_name="local",
    )

You can then start a worker to execute scheduled runs, pulling the flow code from example.py:

# starts a worker and creates `local` Process work pool if it doesn't exist
prefect worker start --pool local

If you’d like to immediately serve this flow as a deployment without running a worker or using work pools, you can use flow.serve.

Deploying using a storage block

If you currently use a storage block to load your flow code but no infrastructure block:

from prefect import flow
from prefect.filesystems import GitHub

  
@flow(log_prints=True)
def my_flow(name: str = "world"):
    print(f"Hello {name}! I'm a flow from a GitHub repo!")


if __name__ == "__main__":
    Deployment.build_from_flow(
        my_flow,
        name="my-deployment",
        storage=GitHub.load("demo-repo"),
        parameters=dict(name="Marvin"),
    )

You can use flow.from_source to load your flow from the same location and flow.deploy to create a deployment:

example.py
from prefect import flow
from prefect.blocks.system import Secret
from prefect.runner.storage import GitRepository

@flow(log_prints=True)
def my_flow(name: str = "world"):
    print(f"Hello {name}! I'm a flow from a GitHub repo!")

if __name__ == "__main__":
    flow.from_source(
        source=GitRepository(
            url="https://github.com/me/myrepo.git",
            credentials={"username": "oauth2", "access_token": Secret.load("my-github-pat")},
        ),
        entrypoint="example.py:my_flow"
    ).deploy(
        name="my-deployment",
        parameters=dict(name="Marvin"),
        work_pool_name="local", # or the name of your work pool
    )

Deploy using an infrastructure block and a storage block

For the code below, you need to create a work pool from your infrastructure block and pass it to flow.deploy as the work_pool_name argument. You also need to pass your storage block to flow.from_source as the source argument.

example.py
from prefect import flow
from prefect.deployments import Deployment
from prefect.filesystems import GitHub # this block class no longer exists
from prefect.infrastructure.kubernetes import KubernetesJob


@flow(log_prints=True)
def my_flow(name: str = "world"):
    print(f"Hello {name}! I'm a flow from a GitHub repo!")

repo = GitHub.load("demo-repo")

if __name__ == "__main__":
    Deployment.build_from_flow(
        my_flow,
        name="my-deployment",
        storage=repo,
        entrypoint="example.py:my_flow",
        infrastructure=KubernetesJob.load("my-k8s-job"),
        infra_overrides=dict(pull_policy="Never"),
        parameters=dict(name="Marvin"),
    )

The equivalent deployment code using flow.deploy should look like this:

example.py
from prefect import flow

if __name__ == "__main__":
    flow.from_source(
        source="https://github.com/me/myrepo.git",
        entrypoint="example.py:my_flow"
    ).deploy(
        name="my-deployment",
        work_pool_name="my-k8s-job",
        job_variables=dict(pull_policy="Never"),
        parameters=dict(name="Marvin"),
    )

When using flow.from_source().deploy() with a remote sourcesuch as a GitHub block or str URL like https://github.com/me/myrepo.git), the flow you’re deploying doesn’t need to be available locally before running your script. See the SDK reference for more info on from_source.

Deploy via a Docker image

If you currently bake your flow code into a Docker image before deploying, you can use the image argument of flow.deploy to build a Docker image as part of your deployment process:

from prefect import flow

@flow(log_prints=True)
def my_flow(name: str = "world"):
    print(f"Hello {name}! I'm a flow from a Docker image!")


if __name__ == "__main__":
    my_flow.deploy(
        name="my-deployment",
        image="my-repo/my-image:latest",
        work_pool_name="my-k8s-job",
        job_variables=dict(pull_policy="Never"),
        parameters=dict(name="Marvin"),
    )

You can skip a flow.from_source call when building an image with flow.deploy. Prefect keeps track of the flow’s source code location in the image and loads it from that location when the flow is executed.

Use prefect deploy

Always run prefect deploy commands from the root level of your repo!

With agents, you may have multiple deployment.yaml files. But under worker deployment patterns, each repo has a single prefect.yaml file located at the root of the repo that contains deployment configuration for all flows in that repo.

To set up a new prefect.yaml file for your deployments, run the following command from the root level of your repo:

prefect deploy

This starts a wizard that guides you through setting up your deployment.

For step 4, select y on the last prompt to save the configuration for the deployment.

Saving the configuration for your deployment results in a prefect.yaml file populated with your first deployment. You can use this YAML file to edit and define multiple deployments for this repo.

You can add more deployments to the deployments list in your prefect.yaml file and/or by continuing to use the deployment creation wizard.

For more information on deployments, check out our in-depth guide for deploying flows to work pools.