prefect.server.database.configurations

Classes

ConnectionTracker

A test utility which tracks the connections given out by a connection pool, to make it easy to see which connections are currently checked out and open. Methods:

clear

clear(self) -> None

on_close

on_close(self, adapted_connection: AdaptedConnection, connection_record: ConnectionPoolEntry) -> None

on_close_detached

on_close_detached(self, adapted_connection: AdaptedConnection) -> None

on_connect

on_connect(self, adapted_connection: AdaptedConnection, connection_record: ConnectionPoolEntry) -> None

track_pool

track_pool(self, pool: sa.pool.Pool) -> None

BaseDatabaseConfiguration

Abstract base class used to inject database connection configuration into Prefect. This configuration is responsible for defining how Prefect REST API creates and manages database connections and sessions. Methods:

begin_transaction

begin_transaction(self, session: AsyncSession, with_for_update: bool = False) -> AbstractAsyncContextManager[AsyncSessionTransaction]
Enter a transaction for a session

create_db

create_db(self, connection: AsyncConnection, base_metadata: sa.MetaData) -> None
Create the database

drop_db

drop_db(self, connection: AsyncConnection, base_metadata: sa.MetaData) -> None
Drop the database

engine

engine(self) -> AsyncEngine
Returns a SqlAlchemy engine

is_inmemory

is_inmemory(self) -> bool
Returns true if database is run in memory

session

session(self, engine: AsyncEngine) -> AsyncSession
Retrieves a SQLAlchemy session for an engine.

unique_key

unique_key(self) -> tuple[Hashable, ...]
Returns a key used to determine whether to instantiate a new DB interface.

AsyncPostgresConfiguration

Methods:

begin_transaction

begin_transaction(self, session: AsyncSession, with_for_update: bool = False) -> AsyncGenerator[AsyncSessionTransaction, None]

begin_transaction

begin_transaction(self, session: AsyncSession, with_for_update: bool = False) -> AbstractAsyncContextManager[AsyncSessionTransaction]
Enter a transaction for a session

create_db

create_db(self, connection: AsyncConnection, base_metadata: sa.MetaData) -> None
Create the database

create_db

create_db(self, connection: AsyncConnection, base_metadata: sa.MetaData) -> None
Create the database

drop_db

drop_db(self, connection: AsyncConnection, base_metadata: sa.MetaData) -> None
Drop the database

drop_db

drop_db(self, connection: AsyncConnection, base_metadata: sa.MetaData) -> None
Drop the database

engine

engine(self) -> AsyncEngine
Retrieves an async SQLAlchemy engine. Args:
  • connection_url: The database connection string. Defaults to self.connection_url
  • echo: Whether to echo SQL sent to the database. Defaults to self.echo
  • timeout: The database statement timeout, in seconds. Defaults to self.timeout
Returns:
  • a SQLAlchemy engine

engine

engine(self) -> AsyncEngine
Returns a SqlAlchemy engine

is_inmemory

is_inmemory(self) -> bool
Returns true if database is run in memory

is_inmemory

is_inmemory(self) -> bool
Returns true if database is run in memory

schedule_engine_disposal

schedule_engine_disposal(self, cache_key: _EngineCacheKey) -> None
Dispose of an engine once the event loop is closing. See caveats at add_event_loop_shutdown_callback. We attempted to lazily clean up old engines when new engines are created, but if the loop the engine is attached to is already closed then the connections cannot be cleaned up properly and warnings are displayed. Engine disposal should only be important when running the application ephemerally. Notably, this is an issue in our tests where many short-lived event loops and engines are created which can consume all of the available database connection slots. Users operating at a scale where connection limits are encountered should be encouraged to use a standalone server.

session

session(self, engine: AsyncEngine) -> AsyncSession
Retrieves a SQLAlchemy session for an engine. Args:
  • engine: a sqlalchemy engine

session

session(self, engine: AsyncEngine) -> AsyncSession
Retrieves a SQLAlchemy session for an engine.

unique_key

unique_key(self) -> tuple[Hashable, ...]
Returns a key used to determine whether to instantiate a new DB interface.

AioSqliteConfiguration

Methods:

begin_sqlite_conn

begin_sqlite_conn(self, conn: aiosqlite.AsyncAdapt_aiosqlite_connection) -> None

begin_sqlite_stmt

begin_sqlite_stmt(self, conn: sa.Connection) -> None

begin_transaction

begin_transaction(self, session: AsyncSession, with_for_update: bool = False) -> AsyncGenerator[AsyncSessionTransaction, None]

begin_transaction

begin_transaction(self, session: AsyncSession, with_for_update: bool = False) -> AbstractAsyncContextManager[AsyncSessionTransaction]
Enter a transaction for a session

create_db

create_db(self, connection: AsyncConnection, base_metadata: sa.MetaData) -> None
Create the database

create_db

create_db(self, connection: AsyncConnection, base_metadata: sa.MetaData) -> None
Create the database

drop_db

drop_db(self, connection: AsyncConnection, base_metadata: sa.MetaData) -> None
Drop the database

drop_db

drop_db(self, connection: AsyncConnection, base_metadata: sa.MetaData) -> None
Drop the database

engine

engine(self) -> AsyncEngine
Retrieves an async SQLAlchemy engine. Args:
  • connection_url: The database connection string. Defaults to self.connection_url
  • echo: Whether to echo SQL sent to the database. Defaults to self.echo
  • timeout: The database statement timeout, in seconds. Defaults to self.timeout
Returns:
  • a SQLAlchemy engine

engine

engine(self) -> AsyncEngine
Returns a SqlAlchemy engine

is_inmemory

is_inmemory(self) -> bool
Returns true if database is run in memory

is_inmemory

is_inmemory(self) -> bool
Returns true if database is run in memory

schedule_engine_disposal

schedule_engine_disposal(self, cache_key: _EngineCacheKey) -> None
Dispose of an engine once the event loop is closing. See caveats at add_event_loop_shutdown_callback. We attempted to lazily clean up old engines when new engines are created, but if the loop the engine is attached to is already closed then the connections cannot be cleaned up properly and warnings are displayed. Engine disposal should only be important when running the application ephemerally. Notably, this is an issue in our tests where many short-lived event loops and engines are created which can consume all of the available database connection slots. Users operating at a scale where connection limits are encountered should be encouraged to use a standalone server.

session

session(self, engine: AsyncEngine) -> AsyncSession
Retrieves a SQLAlchemy session for an engine. Args:
  • engine: a sqlalchemy engine

session

session(self, engine: AsyncEngine) -> AsyncSession
Retrieves a SQLAlchemy session for an engine.

setup_sqlite

setup_sqlite(self, conn: DBAPIConnection, record: ConnectionPoolEntry) -> None
Issue PRAGMA statements to SQLITE on connect. PRAGMAs only last for the duration of the connection. See https://www.sqlite.org/pragma.html for more info.

unique_key

unique_key(self) -> tuple[Hashable, ...]
Returns a key used to determine whether to instantiate a new DB interface.