Available state change hooks

TypeFlowTaskDescription
on_completion✓✓Executes when a flow or task run enters a Completed state.
on_failure✓✓Executes when a flow or task run enters a Failed state.
on_cancellation✓-Executes when a flow run enters a Cancelling state.
on_crashed✓-Executes when a flow run enters a Crashed state.
on_running✓-Executes when a flow run enters a Running state.

Note that the on_rollback hook for tasks is not a proper state change hook but instead is a transaction lifecycle hook. Rollback hooks accept one argument representing the transaction for the task.

Send a notification when a workflow run fails

To send a notification when a flow or task run fails, you can specify a on_failure hook.

from prefect import flow
from prefect.blocks.core import Block
from prefect.settings import PREFECT_API_URL


@flow(retries=1)
def failing_flow():
    raise ValueError("oops!")


@failing_flow.on_failure
def notify_slack(flow, flow_run, state):
    slack_webhook_block = Block.load(
        "slack-webhook/my-slack-webhook"
    )

    slack_webhook_block.notify(
        (
            f"Your job {flow_run.name} entered {state.name} "
            f"with message:\n\n"
            f"See <https://{PREFECT_API_URL.value()}/flow-runs/"
            f"flow-run/{flow_run.id}|the flow run in the UI>\n\n"
            f"Tags: {flow_run.tags}\n\n"
            f"Scheduled start: {flow_run.expected_start_time}"
        )
    )

if __name__ == "__main__":
    failing_flow()

Retries are configured in this example, so the on_failure hook will not run until all retries have completed and the flow run enters a Failed state.

State change hooks execute client side

State change hooks run in the same process as your workflow and execution cannot be guaranteed. For more robust execution of logic in response to state changes, use an Automation.

Pass kwargs to state change hooks

You can compose the with_options method to effectively pass arbitrary **kwargs to your hooks:

from functools import partial
from prefect import flow, task

data = {}

def my_hook(task, task_run, state, **kwargs):
    data.update(state=state, **kwargs)

@task
def bad_task():
    raise ValueError("meh")

@flow
def ok_with_failure_flow(x: str = "foo", y: int = 42):
    bad_task_with_a_hook = bad_task.with_options(
        on_failure=[partial(my_hook, **dict(x=x, y=y))]
    )
    # return a tuple of "bar" and the task run state
    # to avoid raising the task's exception
    return "bar", bad_task_with_a_hook(return_state=True)

_, task_run_state = ok_with_failure_flow()

assert data == {"x": "foo", "y": 42, "state": task_run_state}