asyncutils
prefect.utilities.asyncutils
Utilities for interoperability with async functions and workers from various contexts.
Functions
get_thread_limiter
is_async_fn
Returns True
if a function returns a coroutine.
See https://github.com/microsoft/pyright/issues/2142 for an example use
is_async_gen_fn
Returns True
if a function is an async generator.
create_task
Replacement for asyncio.create_task that will ensure that tasks aren’t garbage collected before they complete. Allows for “fire and forget” behavior in which tasks can be created and the application can move on. Tasks can also be awaited normally.
See https://docs.python.org/3/library/asyncio-task.html#asyncio.create_task for details (and essentially this implementation)
run_coro_as_sync
Runs a coroutine from a synchronous context, as if it were a synchronous function.
The coroutine is scheduled to run in the “run sync” event loop, which is running in its own thread and is started the first time it is needed. This allows us to share objects like async httpx clients among all coroutines running in the loop.
If run_sync is called from within the run_sync loop, it will run the coroutine in a new thread, because otherwise a deadlock would occur. Note that this behavior should not appear anywhere in the Prefect codebase or in user code.
Args:
coroutine
: The coroutine to be run as a synchronous function.force_new_thread
: If True, the coroutine will always be run in a new thread. Defaults to False.wait_for_result
: If True, the function will wait for the coroutine to complete and return the result. If False, the function will submit the coroutine to the “run sync” event loop and return immediately, where it will eventually be run. Defaults to True.
Returns:
- The result of the coroutine if wait_for_result is True, otherwise None.
call_with_mark
run_async_from_worker_thread
Runs an async function in the main thread’s event loop, blocking the worker thread until completion
run_async_in_new_loop
mark_as_worker_thread
in_async_worker_thread
in_async_main_thread
sync_compatible
Converts an async function into a dual async and sync function.
When the returned function is called, we will attempt to determine the best way to enter the async function.
- If in a thread with a running event loop, we will return the coroutine for the caller to await. This is normal async behavior.
- If in a blocking worker thread with access to an event loop in another thread, we will submit the async method to the event loop.
- If we cannot find an event loop, we will create a new one and run the async method then tear down the loop.
Note: Type checkers will infer functions decorated with @sync_compatible
are synchronous. If
you want to use the decorated function in an async context, you will need to ignore the types
and “cast” the return type to a coroutine. For example:
sync
Call an async function from a synchronous context. Block until completion.
If in an asynchronous context, we will run the code in a separate loop instead of failing but a warning will be displayed since this is not recommended.
create_gather_task_group
Create a new task group that gathers results
Classes
GatherIncomplete
Used to indicate retrieving gather results before completion
GatherTaskGroup
A task group that gathers results.
AnyIO does not include gather
support. This class extends the TaskGroup
interface to allow simple gathering.
See https://github.com/agronholm/anyio/issues/100
This class should be instantiated with create_gather_task_group
.
Methods: