Prefect supports transactional semantics in your workflows that allow you to rollback on task failure and configure groups of tasks that run as an atomic unit.

A transaction in Prefect corresponds to a job that needs to be done. A transaction runs at most one time, and produces a result record upon completion at a unique address specified by a dynamically computed cache key. These records can be shared across tasks and flows.

Under the hood, every Prefect task run is governed by a transaction. In the default mode of task execution, all you need to understand about transactions are the policies determining the task’s cache key computation.

Transactions and states

Transactions and states are similar but different in important ways. Transactions determine whether a task should or should not execute, whereas states enable visibility into code execution status.

Write your first transaction

Tasks can be grouped into a common transaction using the transaction context manager:

import os
from time import sleep

from prefect import task, flow
from prefect.transactions import transaction


@task
def write_file(contents: str):
    "Writes to a file."
    with open("side-effect.txt", "w") as f:
        f.write(contents)


@write_file.on_rollback
def del_file(transaction):
    "Deletes file."
    os.unlink("side-effect.txt")


@task
def quality_test():
    "Checks contents of file."
    with open("side-effect.txt", "r") as f:
        data = f.readlines()

    if len(data) < 2:
        raise ValueError("Not enough data!")


@flow
def pipeline(contents: str):
    with transaction():
        write_file(contents)
        sleep(2) # sleeping to give you a chance to see the file
        quality_test()


if __name__ == "__main__":
    pipeline(contents="hello world")

If you run this flow pipeline(contents="hello world!") it will fail. Importantly, after the flow has exited, there is no "side-effect.txt" file in your working directory. This is because the write_file task’s on_rollback hook was executed due to the transaction failing.

on_rollback hooks are different than on_failure hooks

Note that the on_rollback hook is executed when the quality_test task fails, not the write_file task that it is associated with it, which succeeded. This is because rollbacks occur whenever the transaction a task is participating in fails, even if that failure is outside the task’s local scope. This behavior makes transactions a valuable pattern for managing pipeline failure.

Transaction lifecycle

Every transaction goes through at most four lifecycle stages:

  • BEGIN: in this phase, the transaction’s key is computed and looked up. If a record already exists at the key location the transaction considers itself committed.
  • STAGE: in this phase, the transaction stages a piece of data to be committed to its result location. Whether this data is committed or rolled back depends on the commit mode of the transaction.
  • ROLLBACK: if the transaction encounters any error after staging, it rolls itself back and does not proceed to commit anything.
  • COMMIT: in this final phase, the transaction writes its record to its configured location. At this point the transaction is complete.

It is important to note that rollbacks only occur after the transaction has been staged. Revisiting our example from above, there are actually three transactions at play:

  • the larger transaction that begins when with transaction() is executed; this transaction remains active throughout the duration of the subtransactions within it.
  • the transaction associated with the write_file task. Upon completion of the write_file task, this transaction is now STAGED.
  • the transaction associated with the quality_test task. This transaction fails before it can be staged, causing a rollback in its parent transaction which then rolls back any staged subtransactions. In particular, the staged write_file’s transaction is rolled back.

Tasks also have on_commit lifecycle hooks

In addition to the on_rollback hook, a task can also register on_commit hooks that execute whenever its transaction is committed. A task run persists its result only at transaction commit time, which could be significantly after the task’s completion time if it is within a long running transaction.

The signature for an on_commit hook is the same as that of an on_rollback hook:

@write_file.on_commit
def confirmation(transaction):
    print("committing a record now using the task's cache key!")

Access data within transactions

Key-value pairs can be set within a transaction and accessed elsewhere within the transaction, including within the on_rollback hook.

The code below shows how to set a key-value pair within a transaction and access it within the on_rollback hook:

import os
from time import sleep

from prefect import task, flow
from prefect.transactions import transaction


@task
def write_file(filename: str, contents: str):
    "Writes to a file."
    with open(filename, "w") as f:
        f.write(contents)


@write_file.on_rollback
def del_file(txn):
    "Deletes file."
    os.unlink(txn.get("filename"))


@task
def quality_test(filename):
    "Checks contents of file."
    with open(filename, "r") as f:
        data = f.readlines()

    if len(data) < 2:
        raise ValueError(f"Not enough data!")


@flow
def pipeline(filename: str, contents: str):
    with transaction() as txn:
        txn.set("filename", filename)
        write_file(filename, contents)
        sleep(2)  # sleeping to give you a chance to see the file
        quality_test(filename)


if __name__ == "__main__":
    pipeline(
        filename="side-effect.txt",
        contents="hello world",
    )

The value of contents is accessible within the on_rollback hook.

Use get_transaction() to access the transaction object and txn.get("key") to access the value of the key.