prefect.server.models.work_queues

Functions for interacting with work queue ORM objects. Intended for internal use by the Prefect REST API.

Functions

create_work_queue

create_work_queue(db: PrefectDBInterface, session: AsyncSession, work_queue: Union[schemas.core.WorkQueue, schemas.actions.WorkQueueCreate]) -> orm_models.WorkQueue
Inserts a WorkQueue. If a WorkQueue with the same name exists, an error will be thrown. Args:
  • session: a database session
  • work_queue: a WorkQueue model
Returns:
  • orm_models.WorkQueue: the newly-created or updated WorkQueue

read_work_queue

read_work_queue(db: PrefectDBInterface, session: AsyncSession, work_queue_id: Union[UUID, PrefectUUID]) -> Optional[orm_models.WorkQueue]
Reads a WorkQueue by id. Args:
  • session: A database session
  • work_queue_id: a WorkQueue id
Returns:
  • orm_models.WorkQueue: the WorkQueue

read_work_queue_by_name

read_work_queue_by_name(db: PrefectDBInterface, session: AsyncSession, name: str) -> Optional[orm_models.WorkQueue]
Reads a WorkQueue by id. Args:
  • session: A database session
  • work_queue_id: a WorkQueue id
Returns:
  • orm_models.WorkQueue: the WorkQueue

read_work_queues

read_work_queues(db: PrefectDBInterface, session: AsyncSession, offset: Optional[int] = None, limit: Optional[int] = None, work_queue_filter: Optional[schemas.filters.WorkQueueFilter] = None) -> Sequence[orm_models.WorkQueue]
Read WorkQueues. Args:
  • session: A database session
  • offset: Query offset
  • limit: Query limit
  • work_queue_filter: only select work queues matching these filters
Returns: Sequence[orm_models.WorkQueue]: WorkQueues

is_last_polled_recent

is_last_polled_recent(last_polled: Optional[DateTime]) -> bool

update_work_queue

update_work_queue(db: PrefectDBInterface, session: AsyncSession, work_queue_id: UUID, work_queue: schemas.actions.WorkQueueUpdate, emit_status_change: Optional[Callable[[orm_models.WorkQueue], Awaitable[None]]] = None) -> bool
Update a WorkQueue by id. Args:
  • session: A database session
  • work_queue: the work queue data
  • work_queue_id: a WorkQueue id
Returns:
  • whether or not the WorkQueue was updated

delete_work_queue

delete_work_queue(db: PrefectDBInterface, session: AsyncSession, work_queue_id: UUID) -> bool
Delete a WorkQueue by id. Args:
  • session: A database session
  • work_queue_id: a WorkQueue id
Returns:
  • whether or not the WorkQueue was deleted

get_runs_in_work_queue

get_runs_in_work_queue(db: PrefectDBInterface, session: AsyncSession, work_queue_id: UUID, limit: Optional[int] = None, scheduled_before: Optional[datetime.datetime] = None) -> Tuple[orm_models.WorkQueue, Sequence[orm_models.FlowRun]]
Get runs from a work queue. Args:
  • session: A database session. work_queue_id: The work queue id.
  • scheduled_before: Only return runs scheduled to start before this time.
  • limit: An optional limit for the number of runs to return from the queue. This limit applies to the request only. It does not affect the work queue’s concurrency limit. If limit exceeds the work queue’s concurrency limit, it will be ignored.

ensure_work_queue_exists

ensure_work_queue_exists(session: AsyncSession, name: str) -> orm_models.WorkQueue
Checks if a work queue exists and creates it if it does not. Useful when working with deployments, agents, and flow runs that automatically create work queues. Will also create a work pool queue in the default agent pool to facilitate migration to work pools.

read_work_queue_status

read_work_queue_status(session: AsyncSession, work_queue_id: UUID) -> schemas.core.WorkQueueStatusDetail
Get work queue status by id. Args:
  • session: A database session
  • work_queue_id: a WorkQueue id
Returns:
  • Information about the status of the work queue.

record_work_queue_polls

record_work_queue_polls(db: PrefectDBInterface, session: AsyncSession, polled_work_queue_ids: Sequence[UUID], ready_work_queue_ids: Sequence[UUID]) -> None
Record that the given work queues were polled, and also update the given ready_work_queue_ids to READY.

mark_work_queues_ready

mark_work_queues_ready(db: PrefectDBInterface, polled_work_queue_ids: Sequence[UUID], ready_work_queue_ids: Sequence[UUID]) -> None

mark_work_queues_not_ready

mark_work_queues_not_ready(db: PrefectDBInterface, work_queue_ids: Iterable[UUID]) -> None

emit_work_queue_status_event

emit_work_queue_status_event(db: PrefectDBInterface, work_queue: orm_models.WorkQueue) -> None