prefect.server.models.flow_runs

Functions for interacting with flow run ORM objects. Intended for internal use by the Prefect REST API.

Functions

create_flow_run

create_flow_run(db: PrefectDBInterface, session: AsyncSession, flow_run: schemas.core.FlowRun, orchestration_parameters: Optional[dict[str, Any]] = None) -> orm_models.FlowRun
Creates a new flow run. If the provided flow run has a state attached, it will also be created. Args:
  • session: a database session
  • flow_run: a flow run model
Returns:
  • orm_models.FlowRun: the newly-created flow run

update_flow_run

update_flow_run(db: PrefectDBInterface, session: AsyncSession, flow_run_id: UUID, flow_run: schemas.actions.FlowRunUpdate) -> bool
Updates a flow run. Args:
  • session: a database session
  • flow_run_id: the flow run id to update
  • flow_run: a flow run model
Returns:
  • whether or not matching rows were found to update

read_flow_run

read_flow_run(db: PrefectDBInterface, session: AsyncSession, flow_run_id: UUID, for_update: bool = False) -> Optional[orm_models.FlowRun]
Reads a flow run by id. Args:
  • session: A database session
  • flow_run_id: a flow run id
Returns:
  • orm_models.FlowRun: the flow run

read_flow_runs

read_flow_runs(db: PrefectDBInterface, session: AsyncSession, columns: Optional[list[str]] = None, flow_filter: Optional[schemas.filters.FlowFilter] = None, flow_run_filter: Optional[schemas.filters.FlowRunFilter] = None, task_run_filter: Optional[schemas.filters.TaskRunFilter] = None, deployment_filter: Optional[schemas.filters.DeploymentFilter] = None, work_pool_filter: Optional[schemas.filters.WorkPoolFilter] = None, work_queue_filter: Optional[schemas.filters.WorkQueueFilter] = None, offset: Optional[int] = None, limit: Optional[int] = None, sort: schemas.sorting.FlowRunSort = schemas.sorting.FlowRunSort.ID_DESC) -> Sequence[orm_models.FlowRun]
Read flow runs. Args:
  • session: a database session
  • columns: a list of the flow run ORM columns to load, for performance
  • flow_filter: only select flow runs whose flows match these filters
  • flow_run_filter: only select flow runs match these filters
  • task_run_filter: only select flow runs whose task runs match these filters
  • deployment_filter: only select flow runs whose deployments match these filters
  • offset: Query offset
  • limit: Query limit
  • sort: Query sort
Returns:
  • List[orm_models.FlowRun]: flow runs

cleanup_flow_run_concurrency_slots

cleanup_flow_run_concurrency_slots(session: AsyncSession, flow_run: orm_models.FlowRun) -> None
Cleanup flow run related resources, such as releasing concurrency slots. All operations should be idempotent and safe to call multiple times. IMPORTANT: This run may no longer exist in the database when this operation occurs.

read_task_run_dependencies

read_task_run_dependencies(session: AsyncSession, flow_run_id: UUID) -> List[DependencyResult]
Get a task run dependency map for a given flow run.

count_flow_runs

count_flow_runs(db: PrefectDBInterface, session: AsyncSession, flow_filter: Optional[schemas.filters.FlowFilter] = None, flow_run_filter: Optional[schemas.filters.FlowRunFilter] = None, task_run_filter: Optional[schemas.filters.TaskRunFilter] = None, deployment_filter: Optional[schemas.filters.DeploymentFilter] = None, work_pool_filter: Optional[schemas.filters.WorkPoolFilter] = None, work_queue_filter: Optional[schemas.filters.WorkQueueFilter] = None) -> int
Count flow runs. Args:
  • session: a database session
  • flow_filter: only count flow runs whose flows match these filters
  • flow_run_filter: only count flow runs that match these filters
  • task_run_filter: only count flow runs whose task runs match these filters
  • deployment_filter: only count flow runs whose deployments match these filters
Returns:
  • count of flow runs

delete_flow_run

delete_flow_run(db: PrefectDBInterface, session: AsyncSession, flow_run_id: UUID) -> bool
Delete a flow run by flow_run_id, handling concurrency limits if applicable. Args:
  • session: A database session
  • flow_run_id: a flow run id
Returns:
  • whether or not the flow run was deleted

set_flow_run_state

set_flow_run_state(session: AsyncSession, flow_run_id: UUID, state: schemas.states.State, force: bool = False, flow_policy: Optional[Type[FlowRunOrchestrationPolicy]] = None, orchestration_parameters: Optional[Dict[str, Any]] = None, client_version: Optional[str] = None) -> OrchestrationResult
Creates a new orchestrated flow run state. Setting a new state on a run is the one of the principal actions that is governed by Prefect’s orchestration logic. Setting a new run state will not guarantee creation, but instead trigger orchestration rules to govern the proposed state input. If the state is considered valid, it will be written to the database. Otherwise, a it’s possible a different state, or no state, will be created. A force flag is supplied to bypass a subset of orchestration logic. Args:
  • session: a database session
  • flow_run_id: the flow run id
  • state: a flow run state model
  • force: if False, orchestration rules will be applied that may alter or prevent the state transition. If True, orchestration rules are not applied.
Returns:
  • OrchestrationResult object

read_flow_run_graph

read_flow_run_graph(db: PrefectDBInterface, session: AsyncSession, flow_run_id: UUID, since: datetime.datetime = earliest_possible_datetime()) -> Graph
Given a flow run, return the graph of it’s task and subflow runs. If a since datetime is provided, only return items that may have changed since that time.

with_system_labels_for_flow_run

with_system_labels_for_flow_run(session: AsyncSession, flow_run: Union[schemas.core.FlowRun, schemas.actions.FlowRunCreate]) -> schemas.core.KeyValueLabels
Augment user supplied labels with system default labels for a flow run.

update_flow_run_labels

update_flow_run_labels(db: PrefectDBInterface, session: AsyncSession, flow_run_id: UUID, labels: KeyValueLabels) -> bool
Update flow run labels by patching existing labels with new values. Args: session: A database session flow_run_id: the flow run id to update labels: the new labels to patch into existing labels Returns: bool: whether the update was successful

Classes

DependencyResult

Methods:

model_validate_list

model_validate_list(cls, obj: Any) -> list[Self]

reset_fields

reset_fields(self: Self) -> Self
Reset the fields of the model that are in the _reset_fields set. Returns:
  • A new instance of the model with the reset fields.