Skip to main content

Available state change hooks

TypeFlowTaskDescription
on_completionExecutes when a flow or task run enters a Completed state.
on_failureExecutes when a flow or task run enters a Failed state.
on_cancellation-Executes when a flow run enters a Cancelling state.
on_crashed-Executes when a flow run enters a Crashed state.
on_runningExecutes when a flow or task run enters a Running state.
Note that the on_rollback hook for tasks is not a proper state change hook but instead is a transaction lifecycle hook. Rollback hooks accept one argument representing the transaction for the task.

Send a notification when a workflow run fails

To send a notification when a flow or task run fails, you can specify a on_failure hook.
from prefect import flow
from prefect.blocks.core import Block
from prefect.settings import get_current_settings


@flow(retries=1)
def failing_flow():
    raise ValueError("oops!")


@failing_flow.on_failure
def notify_slack(flow, flow_run, state):
    slack_webhook_block = Block.load(
        "slack-webhook/my-slack-webhook"
    )

    PREFECT_API_URL = get_current_settings().api.url

    slack_webhook_block.notify(
        (
            f"Your job {flow_run.name} entered {state.name} "
            f"with message:\n\n"
            f"See <https://{PREFECT_API_URL}/flow-runs/"
            f"flow-run/{flow_run.id}|the flow run in the UI>\n\n"
            f"Tags: {flow_run.tags}\n\n"
            f"Scheduled start: {flow_run.expected_start_time}"
        )
    )

if __name__ == "__main__":
    failing_flow()
Retries are configured in this example, so the on_failure hook will not run until all retries have completed and the flow run enters a Failed state.
State change hooks execute client sideState change hooks run in the same process as your workflow and execution cannot be guaranteed. For more robust execution of logic in response to state changes, use an Automation.

Execute code when a task starts running

The on_running hook executes when a task enters a Running state, before the task body executes. This is useful for logging, metrics, or setting up runtime state:
from prefect import flow, task
import time

def record_start_time(task, task_run, state):
    print(f"Task {task.name} started at {time.time()}")

@task(on_running=[record_start_time])
def my_task():
    return "hello"

# Or use the decorator pattern
@task
def another_task():
    return "world"

@another_task.on_running
def log_start(task, task_run, state):
    print(f"Starting {task.name}!")

@flow
def my_flow():
    my_task()
    another_task()
Note that on_running hooks execute synchronously before the task body runs. If your hook takes 10 seconds, the task waits 10 seconds before starting. When retries are configured, on_running hooks fire on each retry attempt, including the initial run. For example, a task configured with retries=2 will trigger its on_running hooks up to three times: once on the initial run and once for each retry attempt.

Pass kwargs to state change hooks

You can compose the with_options method to effectively pass arbitrary **kwargs to your hooks:
from functools import partial
from prefect import flow, task

data = {}

def my_hook(task, task_run, state, **kwargs):
    data.update(state=state, **kwargs)

@task
def bad_task():
    raise ValueError("meh")

@flow
def ok_with_failure_flow(x: str = "foo", y: int = 42):
    bad_task_with_a_hook = bad_task.with_options(
        on_failure=[partial(my_hook, **dict(x=x, y=y))]
    )
    # return a tuple of "bar" and the task run state
    # to avoid raising the task's exception
    return "bar", bad_task_with_a_hook(return_state=True)

_, task_run_state = ok_with_failure_flow()

assert data == {"x": "foo", "y": 42, "state": task_run_state}