prefect.server.utilities.database

Utilities for interacting with Prefect REST API database and ORM layer.

Prefect supports both SQLite and Postgres. Many of these utilities allow the Prefect REST API to seamlessly switch between the two.

Functions

db_injector

db_injector(func: Union[_DBMethod[T, P, R], _DBFunction[P, R]]) -> Union[_Method[T, P, R], _Function[P, R]]

generate_uuid_postgresql

generate_uuid_postgresql(element: GenerateUUID, compiler: SQLCompiler, **kwargs: Any) -> str

Generates a random UUID in Postgres; requires the pgcrypto extension.

generate_uuid_sqlite

generate_uuid_sqlite(element: GenerateUUID, compiler: SQLCompiler, **kwargs: Any) -> str

Generates a random UUID in other databases (SQLite) by concatenating bytes in a way that approximates a UUID hex representation. This is sufficient for our purposes of having a random client-generated ID that is compatible with a UUID spec.

bindparams_from_clause

bindparams_from_clause(query: sa.ClauseElement) -> dict[str, sa.BindParameter[Any]]

Retrieve all non-anonymous bind parameters defined in a SQL clause

datetime_or_interval_add_postgresql

datetime_or_interval_add_postgresql(element: Union[date_add, interval_add, date_diff], compiler: SQLCompiler, **kwargs: Any) -> str

date_diff_seconds_postgresql

date_diff_seconds_postgresql(element: date_diff_seconds, compiler: SQLCompiler, **kwargs: Any) -> str

current_timestamp_sqlite

current_timestamp_sqlite(element: functions.now, compiler: SQLCompiler, **kwargs: Any) -> str

Generates the current timestamp for SQLite

date_add_sqlite

date_add_sqlite(element: date_add, compiler: SQLCompiler, **kwargs: Any) -> str

interval_add_sqlite

interval_add_sqlite(element: interval_add, compiler: SQLCompiler, **kwargs: Any) -> str

date_diff_sqlite

date_diff_sqlite(element: date_diff, compiler: SQLCompiler, **kwargs: Any) -> str

date_diff_seconds_sqlite

date_diff_seconds_sqlite(element: date_diff_seconds, compiler: SQLCompiler, **kwargs: Any) -> str

sqlite_json_operators

sqlite_json_operators(element: sa.BinaryExpression[Any], compiler: SQLCompiler, override_operator: Optional[OperatorType] = None, **kwargs: Any) -> str

Intercept the PostgreSQL-only JSON / JSONB operators and translate them to SQLite

sqlite_greatest_as_max

sqlite_greatest_as_max(element: greatest[Any], compiler: SQLCompiler, **kwargs: Any) -> str

get_dialect

get_dialect(obj: Union[str, Session, sa.Engine]) -> type[sa.Dialect]

Get the dialect of a session, engine, or connection url.

Primary use case is figuring out whether the Prefect REST API is communicating with SQLite or Postgres.

Classes

GenerateUUID

Platform-independent UUID default generator. Note the actual functionality for this class is specified in the compiles-decorated functions below

Timestamp

TypeDecorator that ensures that timestamps have a timezone.

For SQLite, all timestamps are converted to UTC (since they are stored as naive timestamps without timezones) and recovered as UTC.

Methods:

load_dialect_impl

load_dialect_impl(self, dialect: sa.Dialect) -> TypeEngine[Any]

process_bind_param

process_bind_param(self, value: Optional[datetime.datetime], dialect: sa.Dialect) -> Optional[datetime.datetime]

process_result_value

process_result_value(self, value: Optional[datetime.datetime], dialect: sa.Dialect) -> Optional[datetime.datetime]

UUID

Platform-independent UUID type.

Uses PostgreSQL’s UUID type, otherwise uses CHAR(36), storing as stringified hex values with hyphens.

Methods:

load_dialect_impl

load_dialect_impl(self, dialect: sa.Dialect) -> TypeEngine[Any]

process_bind_param

process_bind_param(self, value: Optional[Union[str, uuid.UUID]], dialect: sa.Dialect) -> Optional[str]

process_result_value

process_result_value(self, value: Optional[Union[str, uuid.UUID]], dialect: sa.Dialect) -> Optional[uuid.UUID]

JSON

JSON type that returns SQLAlchemy’s dialect-specific JSON types, where possible. Uses generic JSON otherwise.

The “base” type is postgresql.JSONB to expose useful methods prior to SQL compilation

Methods:

load_dialect_impl

load_dialect_impl(self, dialect: sa.Dialect) -> TypeEngine[Any]

process_bind_param

process_bind_param(self, value: Optional[Any], dialect: sa.Dialect) -> Optional[Any]

Prepares the given value to be used as a JSON field in a parameter binding

Pydantic

A pydantic type that converts inserted parameters to json and converts read values to the pydantic type.

Methods:

process_bind_param

process_bind_param(self, value: Optional[T], dialect: sa.Dialect) -> Optional[str]

process_result_value

process_result_value(self, value: Optional[Any], dialect: sa.Dialect) -> Optional[T]

date_add

Platform-independent way to add a timestamp and an interval

interval_add

Platform-independent way to add two intervals.

date_diff

Platform-independent difference of two timestamps. Computes d1 - d2.

date_diff_seconds

Platform-independent calculation of the number of seconds between two timestamps or from ‘now’

greatest