Pause or suspend a flow until it receives input
You can pause or suspend a flow until it receives input from a user in Prefect’s UI. This is useful when you need to ask for additional information or feedback before resuming a flow. These workflows are often called human-in-the-loop (HITL) systems.Human-in-the-loop interactivity
Approval workflows that pause to ask a human to confirm whether a workflow should continue are very common in the business world.
Certain types of machine learning training and artificial intelligence
workflows benefit from incorporating HITL design.
Wait for input
To receive input while paused or suspended use thewait_for_input
parameter in the pause_flow_run
or suspend_flow_run
functions.
This parameter accepts one of the following:
- A built-in type like
int
orstr
, or a built-in collection likeList[int]
- A
pydantic.BaseModel
subclass - A subclass of
prefect.input.RunInput
When to use a
RunModel
or BaseModel
instead of a built-in type”
There are a few reasons to use a RunModel
or BaseModel
. The first is that when you let Prefect automatically create one of these
classes for your input type, the field that users see in Prefect’s UI when they click “Resume” on a flow run is named value
and
has no help text to suggest what the field is. If you create a RunInput
or BaseModel
, you can change details like the field name,
help text, and default value, and users see those reflected in the “Resume” form.Types can you pass for
wait_for_input
When you pass a built-in type such as int
as an argument for the wait_for_input
parameter to pause_flow_run
or suspend_flow_run
,
Prefect automatically creates a Pydantic model containing one field annotated with the type you specified. This means you can use
any type annotation that Pydantic accepts for model fields with these functions.pydantic.BaseModel
class. This is useful if you already have a BaseModel
you want to use:
BaseModel
classes are upgraded to RunInput
classes automaticallyWhen you pass a pydantic.BaseModel
class as the wait_for_input
argument to pause_flow_run
or suspend_flow_run
, Prefect
automatically creates a RunInput
class with the same behavior as your BaseModel
and uses that instead.RunInput
classes contain extra logic that allows flows to send and receive them at runtime. You shouldn’t notice any difference.RunInput
class:
Provide initial data
Set default values for fields in your model with thewith_initial_data
method. This is useful for providing default values
for the fields in your own RunInput
class.
Expanding on the example above, you can make the name
field default to “anonymous”:
Provide a description with runtime data
You can provide a dynamic, Markdown description that appears in the Prefect UI when the flow run pauses. This feature enables context-specific prompts, enhancing clarity and user interaction. Building on the example above:Handle custom validation
Prefect uses the fields and type hints on yourRunInput
or BaseModel
class to validate the general structure of input your flow receives.
If you require more complex validation, use Pydantic model_validators.
Calling custom validation runs after the flow resumesPrefect transforms the type annotations in your
RunInput
or BaseModel
class to a JSON schema and uses that schema in
the UI for client-side validation. However, custom validation requires running Python logic defined in your RunInput
class.
Because of this, validation happens after the flow resumes, so you should handle it explicitly in your flow.
Continue reading for an example best practice.RunInput
class that uses a custom model_validator
:
model_validator
decorator to define custom validation for our ShirtOrder
class.
You can use it in a flow like this:
small
and green
, the flow run resumes successfully.
However, if the user chooses size small
and color green
, the flow run will resume, and pause_flow_run
raises a
ValidationError
exception. This causes the flow run to fail and log the error.
To avoid a flow run failure, use a while
loop and pause again if the ValidationError
exception is raised:
Send and receive input at runtime
Use thesend_input
and receive_input
functions to send input to a flow or receive input from a flow at runtime.
You don’t need to pause or suspend the flow to send or receive input.
Reasons to send or receive input without pausing or suspendingYou might want to send or receive input without pausing or suspending in scenarios where the flow run is designed to handle
real-time data. For example, in a live monitoring system, you might need to update certain parameters based on the incoming
data without interrupting the flow. Another example is having a long-running flow that continually responds to runtime input with
low latency. For example, if you’re building a chatbot, you could have a flow that starts a GPT Assistant and manages a conversation thread.
send_input
and receive_input
functions is run_type
, which should be one of the following:
- A built-in type such as
int
orstr
- A
pydantic.BaseModel
class - A
prefect.input.RunInput
class
When to use a
BaseModel
or RunInput
instead of a built-in typeMost built-in types and collections of built-in types should work with send_input
and receive_input
, but there is a caveat with
nested collection types, such as lists of tuples. For example, List[Tuple[str, float]])
. In this case, validation may happen after
your flow receives the data, so calling receive_input
may raise a ValidationError
. You can plan to catch this exception, and
consider placing the field in an explicit BaseModel
or RunInput
so your flow only receives exact type matches.See examples below of receive_input
, send_input
, and the two functions working together.Receiving input
The following flow usesreceive_input
to continually receive names and print a personalized greeting for each name it receives:
str
into receive_input
, Prefect creates a RunInput
class to manage your
input automatically. When a flow sends input of this type, Prefect uses the RunInput
class to validate the input.
If the validation succeeds, your flow receives the input in the type you specified. In this example, if the flow received a
valid string as input, the variable name_input
contains the string value.
If, instead, you pass a BaseModel
, Prefect upgrades your BaseModel
to a RunInput
class, and the variable your flow sees
(in this case, name_input
), is a RunInput
instance that behaves like a BaseModel
. If you pass in a RunInput
class,
no upgrade is needed and you’ll get a RunInput
instance.
A simpler approach is to pass types such as str
into receive_input
. If you need access to the generated
RunInput
that contains the received value, pass with_metadata=True
to receive_input
:
When to use
with_metadata=True
The primary uses of accessing the RunInput
object for a receive input are to respond to the sender with the RunInput.respond()
function, or to access the unique key for an input.name_input.value
. When Prefect generates a RunInput
for you from a built-in type,
the RunInput
class has a single field, value
, that uses a type annotation matching the type you specified.
So if you call receive_input
like this: receive_input(str, with_metadata=True)
, it’s equivalent to manually
creating the following RunInput
class and receive_input
call:
The type used in
receive_input
and send_input
must matchFor a flow to receive input, the sender must use the same type that the receiver is receiving.
This means that if the receiver is receiving GreeterInput
, the sender must send GreeterInput
. If the receiver is
receiving GreeterInput
and the sender sends the str
input that Prefect automatically upgrades to a RunInput
class,
the types won’t match; which means the receiving flow run won’t receive the input. However, the input will wait for if the flow ever
calls receive_input(str)
.Keep track of inputs you’ve already seen
By default, each time you callreceive_input
, you get an iterator that iterates over all known inputs to a specific flow run,
starting with the first received. The iterator keeps track of your current position as you iterate over it, or you can call next()
to explicitly get the next input. If you’re using the iterator in a loop, you should assign it to a variable:
JSONBlock
.
The following flow receives input for 30 seconds then suspends itself, which exits the flow and tears down infrastructure:
seen_keys_block
.
When the flow later suspends and then resumes, it reads the keys it has already seen out of the JSON block and
passes them as the exlude_keys
parameter to receive_input
.
Respond to the input’s sender
When your flow receives input from another flow, Prefect knows the sending flow run ID, so the receiving flow can respond by calling therespond
method on the RunInput
instance the flow received. There are a couple of requirements:
- Pass in a
BaseModel
orRunInput
, or usewith_metadata=True
. - The flow you are responding to must receive the same type of input you send to see it.
respond
method is equivalent to calling send_input(..., flow_run_id=sending_flow_run.id)
. But with respond
,
your flow doesn’t need to know the sending flow run’s ID.
Next, make the greeter_flow
respond to name inputs instead of printing them:
greeter
flow in place, create the flow that sends greeter
names.
Send input
Send input to a flow with thesend_input
function. This works similarly to receive_input
and, like that function,
accepts the same run_input
argument. This can be a built-in type such as str
, or else a BaseModel
or RunInput
subclass.
When to send input to a flow runSend input to a flow run as soon as you have the flow run’s ID. The flow does not have to be receiving input for you to send input.
If you send a flow input before it is receiving, it will see your input when it calls
receive_input
(as long as the types in the send_input
and receive_input
calls match).sender
flow that starts a greeter
flow run and then enters a loop—continuously
getting input from the terminal and sending it to the greeter flow:
run_deployment
starts a greeter
flow run. This requires a deployed flow running in a process.
That process begins running greeter
while sender
continues to execute. Calling run_deployment(..., timeout=0)
ensures that sender
won’t wait for the greeter
flow run to complete, because it’s running a loop and only exits when sending EXIT_SIGNAL
.
Next, the iterator returned by receive_input
as receiver
is captured. This flow works by entering a loop. On each iteration of the loop,
the flow asks for terminal input, sends that to the greeter
flow, and then runs receiver.next()
to wait until it receives the response from greeter
.
Next, the terminal user who ran this flow is allowed to exit by entering the string q
or quit
.
When that happens, the greeter
flow is sent an exit signal to shut down, too.
Finally, the new name is sent to greeter
. greeter
sends back a greeting as a string.
When you receive the greeting, print it and continue the loop that gets terminal input.
A complete example
For a complete example of usingsend_input
and receive_input
, here is what the greeter
and sender
flows look like together: