Prefect 3 is the most flexible, powerful, and performant version of Prefect yet. It offers more ways than ever to model your workflows. This means that some patterns supported by Prefect 2 are no longer supported with Prefect 3, or are enabled differently.

Most Prefect 2 workflows can be upgraded to Prefect 3 with no changes. This guide covers changes that you may need to make to ensure your Prefect 2 workflows keep flowing.

Prefect 3 is an upgrade to the open source prefect Python package. The Prefect 3 release does not involve any changes to Prefect Cloud. All previous and future Prefect 2 releases will continue to work with Prefect Cloud.

Module location and name changes

Some modules have been renamed, reorganized, or removed for clarity. Objects whose import paths have changed will remain functional until their six month deprecation period ends, but will raise deprecation warnings.

See the migration script for the full list of modules that have been moved or removed.

Integrations must be upgraded

Integration packages compatible with Prefect 2 generally cannot be used with Prefect 3 and vice versa. You must update all of your integration packages when upgrading from Prefect 2 to Prefect 3.

Specify extras when you install the prefect package to ensure that you install the integration package version that corresponds to your Prefect version. For example, use pip install prefect[aws] --pre to install Prefect 3 and a compatible prefect-aws version.

Asynchronous task execution behavior

Prefect 2 abstracts away certain task execution concerns, which is often convenient, but sometimes causes unexpected behavior. Prefect 3 defaults to running tasks as they would run as normal Python code, and requires you to be more explicit when you modify task execution behavior. This results in clearer behavior with fewer surprises.

In Prefect 3, all workflow code, including task functions, runs on the main thread. This means:

  • Python code that is not explicitly defined as asynchronous executes synchronously, as normal Python code does.
  • Non-threadsafe objects, such as database connections, can be shared globally within a workflow.

In Prefect 3, synchronous tasks and flows cannot be called from asynchronous tasks or flows as they could in Prefect 2.

To execute tasks asynchronously, you must either write asynchronous Python functions or use the Task.submit method to submit to a Prefect task runner. Prefect 3 encourages conventional asynchronous Python programming principles. Updating your workflows to bring them in line with these principles requires code changes. Here are some examples:

@task
async def my_task():
    ...


# Enclosing flow can be sync
@flow
def my_flow():
    return my_task()

    
if __name__ == "__main__":
    my_flow()

Because Task.submit is always a synchronous call, it does not need to be awaited. Task.submit is still non-blocking and execution of the submitted task is also still non-blocking.

You do not need to use the await keyword in front of your .submit call:

@task
async def my_task():
    ...


@flow
async def my_flow():
    # Must await `.submit()`
    future = await my_task.submit()
    return future

The PrefectFuture methods are also always synchronous in Prefect 3. Names and return values changed to reflect this:

  • PrefectFuture.get_state() is now PrefectFuture.state
  • PrefectFuture.wait() now returns None instead of a State

Wait for the future to complete, then retrieve the state separately:

@task
async def my_task():
    ...


@flow
async def my_flow():
    future = await my_task.submit()
    # State is returned by `wait()`
    state = await future.wait()
    return state

If you do not first wait for the future to complete, the run may raise an AttributeError: coroutine object has no attribute get exception.

In Prefect 3, PrefectFutures resulting from submitted task runs are not automatically awaited or resolved before exiting a flow, unless they’re a dependency of other flows or tasks. You must wait for a task’s PrefectFuture resolution before the end of a flow:

@task
def my_task():
    ...


@flow
def my_flow():
    # The flow will not exit until 
    # these futures are resolved
    futures = my_task.map([1, 2, 3, 4])

If you do not wait for the futures, the tasks may fail with a Flow must be running to start a task error. The one exception to this case is that PrefectFutures returned from a flow resolve to states before completing the flow:

Prefect 3
@flow
def my_flow():
    # Prefect will resolve these futures to 
    # states because the flow returned them, 
    # so you don't have to call `wait()`.
    return my_task.map([1, 2, 3, 4])

Task caching and result persistence

Prefect 3 supports transactional semantics — tasks runs can be grouped into larger transactions, enabling more resilient workflows. In keeping with the transactional model, Prefect 3 encourages task runs to be idempotent — intended to run only once per flow run.

To support this, Prefect 3 has more eager, intelligent task caching defaults than Prefect 2. Whenever a task runs, Prefect 3 first computes its cache key, then uses that cache key to evaluate whether that same task has run with the same inputs and as part of the same flow before. If it has, the cached result from the previous run is used. If it hasn’t, the task runs.

Tasks that may have re-run with the same inputs in Prefect 2 do not re-run in Prefect 3 unless the cache key is modified to be more precise. The cache key can be configured through the new global cache_policy setting, or through the cache_policy on a particular task. If a particular task isn’t idempotent, you can disable this behavior by setting cache_policy=NONE on the task.

This caching behavior only takes effect when result persistence is enabled. See the caching documentation for more information.

Workers replace agents

A deprecation warning was applied to Prefect interfaces related to agent-based deployments in Prefect’s 2.16.4 release, including infrastructure blocks and prefect deployment commands. Agents will continue to be supported in Prefect 2 until September 2024, but are not included in Prefect 3. If you are using agents, learn how to upgrade from agents to workers.

Type enforcement via Pydantic

If you use Pydantic V1 models for data validation and type enforcement, particularly for flow parameters and custom blocks, you must upgrade those models to Pydantic V2 to work with Prefect 3. See the Pydantic migration guide for more information.

Variables can be typed with JSON

In Prefect 2, variables are strings. To store and retrieve typed values in Prefect 2, you have to use certain single-value blocks, including JSON, Date Time, and String blocks.

In Prefect 3, variables can be any JSON-compatible type. Single-value blocks created in Prefect 2 are still accessible in Prefect 3, but you cannot create new single-value blocks.

See the set and get variables page for more information.

Self-hosted server database upgrade

If you are self-hosting a Prefect server instance, run the following command to update your database for Prefect 3:

prefect server database upgrade -y