prefect.utilities.asyncutils

Utilities for interoperability with async functions and workers from various contexts.

Functions

get_thread_limiter

get_thread_limiter() -> anyio.CapacityLimiter

is_async_fn

is_async_fn(func: _SyncOrAsyncCallable[P, R]) -> TypeGuard[Callable[P, Coroutine[Any, Any, Any]]]

Returns True if a function returns a coroutine.

See https://github.com/microsoft/pyright/issues/2142 for an example use

is_async_gen_fn

is_async_gen_fn(func: Callable[P, Any]) -> TypeGuard[Callable[P, AsyncGenerator[Any, Any]]]

Returns True if a function is an async generator.

create_task

create_task(coroutine: Coroutine[Any, Any, R]) -> asyncio.Task[R]

Replacement for asyncio.create_task that will ensure that tasks aren’t garbage collected before they complete. Allows for “fire and forget” behavior in which tasks can be created and the application can move on. Tasks can also be awaited normally.

See https://docs.python.org/3/library/asyncio-task.html#asyncio.create_task for details (and essentially this implementation)

run_coro_as_sync

run_coro_as_sync(coroutine: Coroutine[Any, Any, R]) -> Optional[R]

Runs a coroutine from a synchronous context, as if it were a synchronous function.

The coroutine is scheduled to run in the “run sync” event loop, which is running in its own thread and is started the first time it is needed. This allows us to share objects like async httpx clients among all coroutines running in the loop.

If run_sync is called from within the run_sync loop, it will run the coroutine in a new thread, because otherwise a deadlock would occur. Note that this behavior should not appear anywhere in the Prefect codebase or in user code.

Args:

  • coroutine: The coroutine to be run as a synchronous function.
  • force_new_thread: If True, the coroutine will always be run in a new thread. Defaults to False.
  • wait_for_result: If True, the function will wait for the coroutine to complete and return the result. If False, the function will submit the coroutine to the “run sync” event loop and return immediately, where it will eventually be run. Defaults to True.

Returns:

  • The result of the coroutine if wait_for_result is True, otherwise None.

call_with_mark

call_with_mark(call: Callable[..., R]) -> R

run_async_from_worker_thread

run_async_from_worker_thread(__fn: Callable[P, Awaitable[R]], *args: P.args, **kwargs: P.kwargs) -> R

Runs an async function in the main thread’s event loop, blocking the worker thread until completion

run_async_in_new_loop

run_async_in_new_loop(__fn: Callable[P, Awaitable[R]], *args: P.args, **kwargs: P.kwargs) -> R

mark_as_worker_thread

mark_as_worker_thread() -> None

in_async_worker_thread

in_async_worker_thread() -> bool

in_async_main_thread

in_async_main_thread() -> bool

sync_compatible

sync_compatible(async_fn: Callable[P, Coroutine[Any, Any, R]]) -> Callable[P, Union[R, Coroutine[Any, Any, R]]]

Converts an async function into a dual async and sync function.

When the returned function is called, we will attempt to determine the best way to enter the async function.

  • If in a thread with a running event loop, we will return the coroutine for the caller to await. This is normal async behavior.
  • If in a blocking worker thread with access to an event loop in another thread, we will submit the async method to the event loop.
  • If we cannot find an event loop, we will create a new one and run the async method then tear down the loop.

Note: Type checkers will infer functions decorated with @sync_compatible are synchronous. If you want to use the decorated function in an async context, you will need to ignore the types and “cast” the return type to a coroutine. For example:

python result: Coroutine = sync_compatible(my_async_function)(arg1, arg2) # type: ignore

sync

sync(__async_fn: Callable[P, Awaitable[T]], *args: P.args, **kwargs: P.kwargs) -> T

Call an async function from a synchronous context. Block until completion.

If in an asynchronous context, we will run the code in a separate loop instead of failing but a warning will be displayed since this is not recommended.

create_gather_task_group

create_gather_task_group() -> GatherTaskGroup

Create a new task group that gathers results

Classes

GatherIncomplete

Used to indicate retrieving gather results before completion

GatherTaskGroup

A task group that gathers results.

AnyIO does not include gather support. This class extends the TaskGroup interface to allow simple gathering.

See https://github.com/agronholm/anyio/issues/100

This class should be instantiated with create_gather_task_group.

Methods:

start_soon

start_soon(self, func: Callable[[Unpack[PosArgsT]], Awaitable[Any]], *args: Unpack[PosArgsT]) -> UUID

get_result

get_result(self, key: UUID) -> Any

LazySemaphore