prefect.input.run_input

This module contains functions that allow sending type-checked RunInput data to flows at runtime. Flows can send back responses, establishing two-way channels with senders. These functions are particularly useful for systems that require ongoing data transfer or need to react to input quickly. real-time interaction and efficient data handling. It’s designed to facilitate dynamic communication within distributed or microservices-oriented systems, making it ideal for scenarios requiring continuous data synchronization and processing. It’s particularly useful for systems that require ongoing data input and output.

The following is an example of two flows. One sends a random number to the other and waits for a response. The other receives the number, squares it, and sends the result back. The sender flow then prints the result.

Sender flow:

import random
from uuid import UUID
from prefect import flow
from prefect.logging import get_run_logger
from prefect.input import RunInput

class NumberData(RunInput):
    number: int


@flow
async def sender_flow(receiver_flow_run_id: UUID):
    logger = get_run_logger()

    the_number = random.randint(1, 100)

    await NumberData(number=the_number).send_to(receiver_flow_run_id)

    receiver = NumberData.receive(flow_run_id=receiver_flow_run_id)
    squared = await receiver.next()

    logger.info(f"{the_number} squared is {squared.number}")

Receiver flow:

import random
from uuid import UUID
from prefect import flow
from prefect.logging import get_run_logger
from prefect.input import RunInput

class NumberData(RunInput):
    number: int


@flow
async def receiver_flow():
    async for data in NumberData.receive():
        squared = data.number ** 2
        data.respond(NumberData(number=squared))

Functions

keyset_from_paused_state

keyset_from_paused_state(state: 'State') -> Keyset

Get the keyset for the given Paused state.

Args:

  • -: the state to get the keyset for

keyset_from_base_key

keyset_from_base_key(base_key: str) -> Keyset

Get the keyset for the given base key.

Args:

  • -: the base key to get the keyset for

Returns:

    • Dict[str, str]: the keyset

run_input_subclass_from_type

run_input_subclass_from_type(_type: Union[Type[R], Type[T], pydantic.BaseModel]) -> Union[Type[AutomaticRunInput[T]], Type[R]]

Create a new RunInput subclass from the given type.

receive_input

receive_input(input_type: Union[Type[R], Type[T], pydantic.BaseModel], timeout: Optional[float] = 3600, poll_interval: float = 10, raise_timeout_error: bool = False, exclude_keys: Optional[Set[str]] = None, key_prefix: Optional[str] = None, flow_run_id: Optional[UUID] = None, with_metadata: bool = False) -> Union[GetAutomaticInputHandler[T], GetInputHandler[R]]

Classes

RunInputMetadata

BaseRunInput

Methods:

metadata

metadata(self) -> RunInputMetadata

keyset_from_type

keyset_from_type(cls) -> Keyset

load_from_flow_run_input

load_from_flow_run_input(cls, flow_run_input: 'FlowRunInput') -> Self

Load the run input from a FlowRunInput object.

Args:

  • -: the flow run input to load the input for

with_initial_data

with_initial_data(cls: Type[R], description: Optional[str] = None, **kwargs: Any) -> Type[R]

Create a new RunInput subclass with the given initial data as field defaults.

Args:

  • -: a description to show when resuming a flow run that requires input
  • -: the initial data to populate the subclass

RunInput

Methods:

receive

receive(cls, timeout: Optional[float] = 3600, poll_interval: float = 10, raise_timeout_error: bool = False, exclude_keys: Optional[Set[str]] = None, key_prefix: Optional[str] = None, flow_run_id: Optional[UUID] = None) -> GetInputHandler[Self]

subclass_from_base_model_type

subclass_from_base_model_type(cls, model_cls: Type[pydantic.BaseModel]) -> Type['RunInput']

Create a new RunInput subclass from the given pydantic.BaseModel subclass.

Args:

  • -: the class from which to create the new RunInput subclass

AutomaticRunInput

Methods:

subclass_from_type

subclass_from_type(cls, _type: Type[T]) -> Type['AutomaticRunInput[T]']

Create a new AutomaticRunInput subclass from the given type.

This method uses the type’s name as a key prefix to identify related flow run inputs. This helps in ensuring that values saved under a type (like List[int]) are retrievable under the generic type name (like “list”).

receive

receive(cls, timeout: Optional[float] = 3600, poll_interval: float = 10, raise_timeout_error: bool = False, exclude_keys: Optional[Set[str]] = None, key_prefix: Optional[str] = None, flow_run_id: Optional[UUID] = None, with_metadata: bool = False) -> GetAutomaticInputHandler[T]

GetInputHandler

Methods:

to_instance

to_instance(self, flow_run_input: 'FlowRunInput') -> R

GetAutomaticInputHandler

Methods:

to_instance

to_instance(self, flow_run_input: 'FlowRunInput') -> Union[T, AutomaticRunInput[T]]