prefect.server.database.query_components

Classes

FlowRunGraphV2Node

BaseQueryComponents

Abstract base class used to inject dialect-specific SQL operations into Prefect.

Methods:

unique_key

unique_key(self) -> tuple[Hashable, ...]

Returns a key used to determine whether to instantiate a new DB interface.

insert

insert(self, obj: type[orm_models.Base]) -> Union[postgresql.Insert, sqlite.Insert]

dialect-specific insert statement

uses_json_strings

uses_json_strings(self) -> bool

specifies whether the configured dialect returns JSON as strings

cast_to_json

cast_to_json(self, json_obj: sa.ColumnElement[T]) -> sa.ColumnElement[T]

casts to JSON object if necessary

build_json_object

build_json_object(self, *args: Union[str, sa.ColumnElement[Any]]) -> sa.ColumnElement[Any]

builds a JSON object from sequential key-value pairs

json_arr_agg

json_arr_agg(self, json_array: sa.ColumnElement[Any]) -> sa.ColumnElement[Any]

aggregates a JSON array

make_timestamp_intervals

make_timestamp_intervals(self, start_time: datetime.datetime, end_time: datetime.datetime, interval: datetime.timedelta) -> sa.Select[tuple[datetime.datetime, datetime.datetime]]

set_state_id_on_inserted_flow_runs_statement

set_state_id_on_inserted_flow_runs_statement(self, inserted_flow_run_ids: Sequence[UUID], insert_flow_run_states: Iterable[dict[str, Any]]) -> sa.Update

get_scheduled_flow_runs_from_work_queues

get_scheduled_flow_runs_from_work_queues(self, db: PrefectDBInterface, limit_per_queue: Optional[int] = None, work_queue_ids: Optional[list[UUID]] = None, scheduled_before: Optional[DateTime] = None) -> sa.Select[tuple[orm_models.FlowRun, UUID]]

Returns all scheduled runs in work queues, subject to provided parameters.

This query returns a (orm_models.FlowRun, orm_models.WorkQueue.id) pair; calling result.all() will return both; calling result.scalars().unique().all() will return only the flow run because it grabs the first result.

clear_configuration_value_cache_for_key

clear_configuration_value_cache_for_key(self, key: str) -> None

Removes a configuration key from the cache.

AsyncPostgresQueryComponents

Methods:

insert

insert(self, obj: type[orm_models.Base]) -> postgresql.Insert

uses_json_strings

uses_json_strings(self) -> bool

cast_to_json

cast_to_json(self, json_obj: sa.ColumnElement[T]) -> sa.ColumnElement[T]

build_json_object

build_json_object(self, *args: Union[str, sa.ColumnElement[Any]]) -> sa.ColumnElement[Any]

json_arr_agg

json_arr_agg(self, json_array: sa.ColumnElement[Any]) -> sa.ColumnElement[Any]

make_timestamp_intervals

make_timestamp_intervals(self, start_time: datetime.datetime, end_time: datetime.datetime, interval: datetime.timedelta) -> sa.Select[tuple[datetime.datetime, datetime.datetime]]

set_state_id_on_inserted_flow_runs_statement

set_state_id_on_inserted_flow_runs_statement(self, db: PrefectDBInterface, inserted_flow_run_ids: Sequence[UUID], insert_flow_run_states: Iterable[dict[str, Any]]) -> sa.Update

Given a list of flow run ids and associated states, set the state_id to the appropriate state for all flow runs

UUIDList

Map a JSON list of strings back to a list of UUIDs at the result loading stage

Methods:

process_result_value

process_result_value(self, value: Optional[list[Union[str, UUID]]], dialect: sa.Dialect) -> Optional[list[UUID]]

AioSqliteQueryComponents

Methods:

insert

insert(self, obj: type[orm_models.Base]) -> sqlite.Insert

uses_json_strings

uses_json_strings(self) -> bool

cast_to_json

cast_to_json(self, json_obj: sa.ColumnElement[T]) -> sa.ColumnElement[T]

build_json_object

build_json_object(self, *args: Union[str, sa.ColumnElement[Any]]) -> sa.ColumnElement[Any]

json_arr_agg

json_arr_agg(self, json_array: sa.ColumnElement[Any]) -> sa.ColumnElement[Any]

make_timestamp_intervals

make_timestamp_intervals(self, start_time: datetime.datetime, end_time: datetime.datetime, interval: datetime.timedelta) -> sa.Select[tuple[datetime.datetime, datetime.datetime]]

set_state_id_on_inserted_flow_runs_statement

set_state_id_on_inserted_flow_runs_statement(self, db: PrefectDBInterface, inserted_flow_run_ids: Sequence[UUID], insert_flow_run_states: Iterable[dict[str, Any]]) -> sa.Update

Given a list of flow run ids and associated states, set the state_id to the appropriate state for all flow runs