States are rich objects that contain information about the status of a particular task run or flow run.

You can learn many things about a task or flow by examining its current state or the history of its states. For example, a state could tell you that a task:

  • is scheduled to make a third run attempt in an hour
  • succeeded and what data it produced
  • was scheduled to run, but later cancelled
  • used the cached result of a previous run instead of re-running
  • failed because it timed out

Only runs have states: Flows and tasks are templates that describe what a system does; only when we run the system does it also take on a state.

State types

Prefect states have names and types. A state’s name is often, but not always, synonymous with its type. For example, a task run that is running for the first time has a state with the name Running and the type RUNNING. However, if the task retries, that same task run will have the name Retrying and the type RUNNING.

The distinction between types and names is subtle: state types are typically used for backing orchestration logic, whereas state names are more for visual display and bookkeeping.

The full list of states and state types includes:

NameTypeTerminal?Description
ScheduledSCHEDULEDNoThe run will begin at a particular time in the future.
LateSCHEDULEDNoThe run’s scheduled start time has passed, but it has not transitioned to PENDING (15 seconds by default).
AwaitingRetrySCHEDULEDNoThe run did not complete successfully because of a code issue and had remaining retry attempts.
PendingPENDINGNoThe run has been submitted to execute, but is waiting on necessary preconditions to be satisfied.
RunningRUNNINGNoThe run code is currently executing.
RetryingRUNNINGNoThe run code is currently executing after previously not completing successfully.
PausedPAUSEDNoThe run code has stopped executing until it receives manual approval to proceed.
CancellingCANCELLINGNoThe infrastructure on which the code was running is being cleaned up.
CancelledCANCELLEDYesThe run did not complete because a user determined that it should not.
CompletedCOMPLETEDYesThe run completed successfully.
CachedCOMPLETEDYesThe run result was loaded from a previously cached value.
RolledBackCOMPLETEDYesThe run completed successfully but the transaction rolled back and executed rollback hooks.
FailedFAILEDYesThe run did not complete because of a code issue and had no remaining retry attempts.
CrashedCRASHEDYesThe run did not complete because of an infrastructure issue.

Final state determination

The final state of a flow or task run depends on a number of factors; generally speaking there are three categories of terminal states:

  • COMPLETED: a run in any COMPLETED state did not encounter any errors or exceptions and returned successfully
  • FAILED: a run in any FAILED state encountered an error during execution, such as a raised exception
  • CRASHED: a run in any CRASHED state was interrupted by an OS signal such as a KeyboardInterrupt or SIGTERM

Task return values

A task will be placed into a Completed state if it returns any Python object, with one exception: if a task explicitly returns a Prefect Failed state, the task will be marked Failed.

from prefect import task, flow
from prefect.states import Completed, Failed


@task
def toggle_task(fail: bool):
    if fail:
        return Failed(message="I was instructed to fail.")
    else:
        return Completed(message="I was instructed to succeed.")


@flow
def example():
    # this run will be set to a `Failed` state
    state_one = toggle_task(fail=True)

    # this run will be set to a `Completed` state
    state_two = toggle_task(fail=False)

    # similarly, the flow run will fail because we return a `Failed` state
    return state_one, state_two 

You can also access state objects directly within a flow through the return_state flag:

from prefect import flow, task 


@task 
def add_one(x):
    return x + 1


@flow 
def my_flow():
    result = add_one(1)
    assert isinstance(result, int) and result == 2

    state = add_one(1, return_state=True)
    assert state.is_completed() is True
    assert state.result() == 2

Returning a State via return_state=True is useful when you want to conditionally respond to the terminal states of a task or flow. For example, if state.is_failed(): ....

Flow return values

The final state of a flow is determined by its return value. The following rules apply:

  • If an exception is raised directly in the flow function, the flow run is marked as FAILED.
  • If a flow returns a manually created state, it is used as the state of the final flow run. This allows for manual determination of final state.
  • If a flow returns an iterable of states, the presence of any FAILED state will cause the run to be marked as FAILED.

In any other situation in which the flow returns without error, it will be marked as COMPLETED.

If you manipulate states programmatically, you can create situations in which tasks within a flow can fail and not cause flow run failure. For example:

from prefect import flow, task 


@task 
def add_one(x):
    return x + 1


@flow 
def my_flow():
    # avoided raising an exception via `return_state=True`
    state = add_one("1", return_state=True)
    assert state.is_failed()

    # the flow function returns successfully!
    return

If state were returned from the flow function, the run would be marked as FAILED.

Execute code on state changes

State change hooks execute code in response to client side changes in flow or task run states, enabling you to define actions for specific state transitions in a workflow.

A state hook must have the following signature:

def my_hook(obj: Task | Flow, run: TaskRun | FlowRun, state: State) -> None:
    ...

Both task and flow run hooks can be specified through a keyword argument or through decorator syntax:

from prefect import task, flow

# for type hints only
from prefect import Task
from prefect.context import TaskRun
from prefect.states import State


def first_task_hook(tsk: Task, run: TaskRun, state: State) -> None:
    if not state.name == 'Cached':
        print('I run anytime this task executes successfully')
    else:
        print('and can condition my behavior on details of this run')


@task(log_prints=True, on_completion=[first_task_hook])
def nice_task(name: str):
    print(f"Hello {name}!")


# alternatively hooks can be specified via decorator
@my_nice_task.on_completion
def second_hook(tsk: Task, run: TaskRun, state: State) -> None:
    print('another hook')

State change hooks are versatile, allowing you to specify multiple state change hooks for the same state transition, or to use the same state change hook for different transitions:

def my_success_hook(task, task_run, state):
    print("Task run succeeded!")

def my_failure_hook(task, task_run, state):
    print("Task run failed!")

def my_succeed_or_fail_hook(task, task_run, state):
    print("If the task run succeeds or fails, this hook runs.")

@task(
    on_completion=[my_success_hook, my_succeed_or_fail_hook],
    on_failure=[my_failure_hook, my_succeed_or_fail_hook]
)

Available state change hooks

TypeFlowTaskDescription
on_completionExecutes when a flow or task run enters a Completed state.
on_failureExecutes when a flow or task run enters a Failed state.
on_cancellation-Executes when a flow run enters a Cancelling state.
on_crashed-Executes when a flow run enters a Crashed state.
on_running-Executes when a flow run enters a Running state.

Note that the on_rollback hook for tasks is not a proper state change hook but instead is a transaction lifecycle hook. Rollback hooks accept one argument representing the transaction for the task.

Pass kwargs to state change hooks

You can compose the with_options method to effectively pass arbitrary **kwargs to your hooks:

from functools import partial
from prefect import flow, task

data = {}

def my_hook(task, task_run, state, **kwargs):
    data.update(state=state, **kwargs)

@task
def bad_task():
    raise ValueError("meh")

@flow
def ok_with_failure_flow(x: str = "foo", y: int = 42):
    bad_task_with_a_hook = bad_task.with_options(
        on_failure=[partial(my_hook, **dict(x=x, y=y))]
    )
    # return a tuple of "bar" and the task run state
    # to avoid raising the task's exception
    return "bar", bad_task_with_a_hook(return_state=True)

_, task_run_state = ok_with_failure_flow()

assert data == {"x": "foo", "y": 42, "state": task_run_state}

Example usage: send a notification when a flow run fails

State change hooks enable you to customize messages sent when tasks transition between states, such as sending notifications containing sensitive information when tasks enter a Failed state.

Here’s an example of running a client-side hook upon a flow run entering a Failed state:

from prefect import flow
from prefect.blocks.core import Block
from prefect.settings import PREFECT_API_URL

def notify_slack(flow, flow_run, state):
    slack_webhook_block = Block.load(
        "slack-webhook/my-slack-webhook"
    )
            
    slack_webhook_block.notify(
        (
            f"Your job {flow_run.name} entered {state.name} "
            f"with message:\n\n"
            f"See <https://{PREFECT_API_URL.value()}/flow-runs/"
            f"flow-run/{flow_run.id}|the flow run in the UI>\n\n"
            f"Tags: {flow_run.tags}\n\n"
            f"Scheduled start: {flow_run.expected_start_time}"
        )
    )

@flow(on_failure=[notify_slack], retries=1)
def failing_flow():
    raise ValueError("oops!")

if __name__ == "__main__":
    failing_flow()

Note that retries are configured in this example. This means the on_failure hook does not run until all retries have completed when the flow run enters a Failed state.