Skip to main content

prefect.server.events.actions

The actions consumer watches for actions that have been triggered by Automations and carries them out. Also includes the various concrete subtypes of Actions

Functions

record_action_happening

record_action_happening(id: UUID) -> None
Record that an action has happened, with an expiration of an hour.

action_has_already_happened

action_has_already_happened(id: UUID) -> bool
Check if the action has already happened

consumer

consumer() -> AsyncGenerator[MessageHandler, None]

Classes

ActionFailed

Action

An Action that may be performed when an Automation is triggered Methods:

act

act(self, triggered_action: 'TriggeredAction') -> None
Perform the requested Action

fail

fail(self, triggered_action: 'TriggeredAction', reason: str) -> None

logging_context

logging_context(self, triggered_action: 'TriggeredAction') -> Dict[str, Any]
Common logging context for all actions

model_validate_list

model_validate_list(cls, obj: Any) -> list[Self]

reset_fields

reset_fields(self: Self) -> Self
Reset the fields of the model that are in the _reset_fields set. Returns:
  • A new instance of the model with the reset fields.

succeed

succeed(self, triggered_action: 'TriggeredAction') -> None

DoNothing

Do nothing when an Automation is triggered Methods:

act

act(self, triggered_action: 'TriggeredAction') -> None

describe_for_cli

describe_for_cli(self) -> str
A human-readable description of the action

EmitEventAction

Methods:

act

act(self, triggered_action: 'TriggeredAction') -> None

create_event

create_event(self, triggered_action: 'TriggeredAction') -> 'Event'
Create an event from the TriggeredAction

describe_for_cli

describe_for_cli(self) -> str
A human-readable description of the action

ExternalDataAction

Base class for Actions that require data from an external source such as the Orchestration API Methods:

describe_for_cli

describe_for_cli(self) -> str
A human-readable description of the action

events_api_client

events_api_client(self, triggered_action: 'TriggeredAction') -> PrefectServerEventsAPIClient

orchestration_client

orchestration_client(self, triggered_action: 'TriggeredAction') -> 'OrchestrationClient'

reason_from_response

reason_from_response(self, response: Response) -> str

JinjaTemplateAction

Base class for Actions that use Jinja templates supplied by the user and are rendered with a context containing data from the triggered action, and the orchestration API. Methods:

events_api_client

events_api_client(self, triggered_action: 'TriggeredAction') -> PrefectServerEventsAPIClient

instantiate_object

instantiate_object(self, model: Type[PrefectBaseModel], data: Dict[str, Any], triggered_action: 'TriggeredAction', resource: Optional['Resource'] = None) -> PrefectBaseModel

orchestration_client

orchestration_client(self, triggered_action: 'TriggeredAction') -> 'OrchestrationClient'

reason_from_response

reason_from_response(self, response: Response) -> str

templates_in_dictionary

templates_in_dictionary(cls, dict_: dict[Any, Any | dict[Any, Any]]) -> list[tuple[dict[Any, Any], dict[Any, str]]]

validate_template

validate_template(cls, template: str, field_name: str) -> str

DeploymentAction

Base class for Actions that operate on Deployments and need to infer them from events Methods:

deployment_id_to_use

deployment_id_to_use(self, triggered_action: 'TriggeredAction') -> UUID

describe_for_cli

describe_for_cli(self) -> str
A human-readable description of the action

selected_deployment_requires_id

selected_deployment_requires_id(self) -> Self

DeploymentCommandAction

Executes a command against a matching deployment Methods:

act

act(self, triggered_action: 'TriggeredAction') -> None

command

command(self, orchestration: 'OrchestrationClient', deployment_id: UUID, triggered_action: 'TriggeredAction') -> Response
Execute the deployment command

events_api_client

events_api_client(self, triggered_action: 'TriggeredAction') -> PrefectServerEventsAPIClient

orchestration_client

orchestration_client(self, triggered_action: 'TriggeredAction') -> 'OrchestrationClient'

reason_from_response

reason_from_response(self, response: Response) -> str

selected_deployment_requires_id

selected_deployment_requires_id(self)

RunDeployment

Runs the given deployment with the given parameters Methods:

act

act(self, triggered_action: 'TriggeredAction') -> None

command

command(self, orchestration: 'OrchestrationClient', deployment_id: UUID, triggered_action: 'TriggeredAction') -> Response

command

command(self, orchestration: 'OrchestrationClient', deployment_id: UUID, triggered_action: 'TriggeredAction') -> Response
Execute the deployment command

instantiate_object

instantiate_object(self, model: Type[PrefectBaseModel], data: Dict[str, Any], triggered_action: 'TriggeredAction', resource: Optional['Resource'] = None) -> PrefectBaseModel

render_parameters

render_parameters(self, triggered_action: 'TriggeredAction') -> Dict[str, Any]

templates_in_dictionary

templates_in_dictionary(cls, dict_: dict[Any, Any | dict[Any, Any]]) -> list[tuple[dict[Any, Any], dict[Any, str]]]

validate_parameters

validate_parameters(cls, value: dict[str, Any] | None) -> dict[str, Any] | None

validate_template

validate_template(cls, template: str, field_name: str) -> str

PauseDeployment

Pauses the given Deployment Methods:

act

act(self, triggered_action: 'TriggeredAction') -> None

command

command(self, orchestration: 'OrchestrationClient', deployment_id: UUID, triggered_action: 'TriggeredAction') -> Response

command

command(self, orchestration: 'OrchestrationClient', deployment_id: UUID, triggered_action: 'TriggeredAction') -> Response
Execute the deployment command

ResumeDeployment

Resumes the given Deployment Methods:

act

act(self, triggered_action: 'TriggeredAction') -> None

command

command(self, orchestration: 'OrchestrationClient', deployment_id: UUID, triggered_action: 'TriggeredAction') -> Response

command

command(self, orchestration: 'OrchestrationClient', deployment_id: UUID, triggered_action: 'TriggeredAction') -> Response
Execute the deployment command

FlowRunAction

An action that operates on a flow run Methods:

events_api_client

events_api_client(self, triggered_action: 'TriggeredAction') -> PrefectServerEventsAPIClient

flow_run

flow_run(self, triggered_action: 'TriggeredAction') -> UUID

orchestration_client

orchestration_client(self, triggered_action: 'TriggeredAction') -> 'OrchestrationClient'

reason_from_response

reason_from_response(self, response: Response) -> str

FlowRunStateChangeAction

Changes the state of a flow run associated with the trigger Methods:

act

act(self, triggered_action: 'TriggeredAction') -> None

flow_run

flow_run(self, triggered_action: 'TriggeredAction') -> UUID

new_state

new_state(self, triggered_action: 'TriggeredAction') -> StateCreate
Return the new state for the flow run

ChangeFlowRunState

Changes the state of a flow run associated with the trigger Methods:

act

act(self, triggered_action: 'TriggeredAction') -> None

new_state

new_state(self, triggered_action: 'TriggeredAction') -> StateCreate

new_state

new_state(self, triggered_action: 'TriggeredAction') -> StateCreate
Return the new state for the flow run

CancelFlowRun

Cancels a flow run associated with the trigger Methods:

act

act(self, triggered_action: 'TriggeredAction') -> None

new_state

new_state(self, triggered_action: 'TriggeredAction') -> StateCreate

new_state

new_state(self, triggered_action: 'TriggeredAction') -> StateCreate
Return the new state for the flow run

SuspendFlowRun

Suspends a flow run associated with the trigger Methods:

act

act(self, triggered_action: 'TriggeredAction') -> None

new_state

new_state(self, triggered_action: 'TriggeredAction') -> StateCreate

new_state

new_state(self, triggered_action: 'TriggeredAction') -> StateCreate
Return the new state for the flow run

ResumeFlowRun

Resumes a paused or suspended flow run associated with the trigger Methods:

act

act(self, triggered_action: 'TriggeredAction') -> None

flow_run

flow_run(self, triggered_action: 'TriggeredAction') -> UUID

CallWebhook

Call a webhook when an Automation is triggered. Methods:

act

act(self, triggered_action: 'TriggeredAction') -> None

ensure_payload_is_a_string

ensure_payload_is_a_string(cls, value: Union[str, Dict[str, Any], None]) -> Optional[str]
Temporary measure while we migrate payloads from being a dictionary to a string template. This covers both reading from the database where values may currently be a dictionary, as well as the API, where older versions of the frontend may be sending a JSON object with the single "message" key.

instantiate_object

instantiate_object(self, model: Type[PrefectBaseModel], data: Dict[str, Any], triggered_action: 'TriggeredAction', resource: Optional['Resource'] = None) -> PrefectBaseModel

templates_in_dictionary

templates_in_dictionary(cls, dict_: dict[Any, Any | dict[Any, Any]]) -> list[tuple[dict[Any, Any], dict[Any, str]]]

validate_payload_templates

validate_payload_templates(cls, value: Optional[str]) -> Optional[str]
Validate user-provided payload template.

validate_template

validate_template(cls, template: str, field_name: str) -> str

SendNotification

Send a notification when an Automation is triggered Methods:

act

act(self, triggered_action: 'TriggeredAction') -> None

instantiate_object

instantiate_object(self, model: Type[PrefectBaseModel], data: Dict[str, Any], triggered_action: 'TriggeredAction', resource: Optional['Resource'] = None) -> PrefectBaseModel

is_valid_template

is_valid_template(cls, value: str, info: ValidationInfo) -> str

render

render(self, triggered_action: 'TriggeredAction') -> List[str]

templates_in_dictionary

templates_in_dictionary(cls, dict_: dict[Any, Any | dict[Any, Any]]) -> list[tuple[dict[Any, Any], dict[Any, str]]]

validate_template

validate_template(cls, template: str, field_name: str) -> str

WorkPoolAction

Base class for Actions that operate on Work Pools and need to infer them from events Methods:

describe_for_cli

describe_for_cli(self) -> str
A human-readable description of the action

selected_work_pool_requires_id

selected_work_pool_requires_id(self) -> Self

work_pool_id_to_use

work_pool_id_to_use(self, triggered_action: 'TriggeredAction') -> UUID

WorkPoolCommandAction

Methods:

act

act(self, triggered_action: 'TriggeredAction') -> None

command

command(self, orchestration: 'OrchestrationClient', work_pool: WorkPool, triggered_action: 'TriggeredAction') -> Response
Issue the command to the Work Pool

events_api_client

events_api_client(self, triggered_action: 'TriggeredAction') -> PrefectServerEventsAPIClient

orchestration_client

orchestration_client(self, triggered_action: 'TriggeredAction') -> 'OrchestrationClient'

reason_from_response

reason_from_response(self, response: Response) -> str

target_work_pool

target_work_pool(self, triggered_action: 'TriggeredAction') -> WorkPool

PauseWorkPool

Pauses a Work Pool Methods:

act

act(self, triggered_action: 'TriggeredAction') -> None

command

command(self, orchestration: 'OrchestrationClient', work_pool: WorkPool, triggered_action: 'TriggeredAction') -> Response

command

command(self, orchestration: 'OrchestrationClient', work_pool: WorkPool, triggered_action: 'TriggeredAction') -> Response
Issue the command to the Work Pool

target_work_pool

target_work_pool(self, triggered_action: 'TriggeredAction') -> WorkPool

ResumeWorkPool

Resumes a Work Pool Methods:

act

act(self, triggered_action: 'TriggeredAction') -> None

command

command(self, orchestration: 'OrchestrationClient', work_pool: WorkPool, triggered_action: 'TriggeredAction') -> Response

command

command(self, orchestration: 'OrchestrationClient', work_pool: WorkPool, triggered_action: 'TriggeredAction') -> Response
Issue the command to the Work Pool

target_work_pool

target_work_pool(self, triggered_action: 'TriggeredAction') -> WorkPool

WorkQueueAction

Base class for Actions that operate on Work Queues and need to infer them from events Methods:

describe_for_cli

describe_for_cli(self) -> str
A human-readable description of the action

selected_work_queue_requires_id

selected_work_queue_requires_id(self) -> Self

work_queue_id_to_use

work_queue_id_to_use(self, triggered_action: 'TriggeredAction') -> UUID

WorkQueueCommandAction

Methods:

act

act(self, triggered_action: 'TriggeredAction') -> None

command

command(self, orchestration: 'OrchestrationClient', work_queue_id: UUID, triggered_action: 'TriggeredAction') -> Response
Issue the command to the Work Queue

events_api_client

events_api_client(self, triggered_action: 'TriggeredAction') -> PrefectServerEventsAPIClient

orchestration_client

orchestration_client(self, triggered_action: 'TriggeredAction') -> 'OrchestrationClient'

reason_from_response

reason_from_response(self, response: Response) -> str

selected_work_queue_requires_id

selected_work_queue_requires_id(self) -> Self

PauseWorkQueue

Pauses a Work Queue Methods:

act

act(self, triggered_action: 'TriggeredAction') -> None

command

command(self, orchestration: 'OrchestrationClient', work_queue_id: UUID, triggered_action: 'TriggeredAction') -> Response

command

command(self, orchestration: 'OrchestrationClient', work_queue_id: UUID, triggered_action: 'TriggeredAction') -> Response
Issue the command to the Work Queue

ResumeWorkQueue

Resumes a Work Queue Methods:

act

act(self, triggered_action: 'TriggeredAction') -> None

command

command(self, orchestration: 'OrchestrationClient', work_queue_id: UUID, triggered_action: 'TriggeredAction') -> Response

command

command(self, orchestration: 'OrchestrationClient', work_queue_id: UUID, triggered_action: 'TriggeredAction') -> Response
Issue the command to the Work Queue

AutomationAction

Base class for Actions that operate on Automations and need to infer them from events Methods:

automation_id_to_use

automation_id_to_use(self, triggered_action: 'TriggeredAction') -> UUID

describe_for_cli

describe_for_cli(self) -> str
A human-readable description of the action

selected_automation_requires_id

selected_automation_requires_id(self) -> Self

AutomationCommandAction

Methods:

act

act(self, triggered_action: 'TriggeredAction') -> None

command

command(self, events: PrefectServerEventsAPIClient, automation_id: UUID, triggered_action: 'TriggeredAction') -> Response
Issue the command to the Work Queue

events_api_client

events_api_client(self, triggered_action: 'TriggeredAction') -> PrefectServerEventsAPIClient

orchestration_client

orchestration_client(self, triggered_action: 'TriggeredAction') -> 'OrchestrationClient'

reason_from_response

reason_from_response(self, response: Response) -> str

selected_automation_requires_id

selected_automation_requires_id(self) -> Self

PauseAutomation

Pauses a Work Queue Methods:

act

act(self, triggered_action: 'TriggeredAction') -> None

command

command(self, events: PrefectServerEventsAPIClient, automation_id: UUID, triggered_action: 'TriggeredAction') -> Response

command

command(self, events: PrefectServerEventsAPIClient, automation_id: UUID, triggered_action: 'TriggeredAction') -> Response
Issue the command to the Work Queue

ResumeAutomation

Resumes a Work Queue Methods:

act

act(self, triggered_action: 'TriggeredAction') -> None

command

command(self, events: PrefectServerEventsAPIClient, automation_id: UUID, triggered_action: 'TriggeredAction') -> Response

command

command(self, events: PrefectServerEventsAPIClient, automation_id: UUID, triggered_action: 'TriggeredAction') -> Response
Issue the command to the Work Queue