Automations are a Prefect Cloud feature that enable you to configure actions that execute automatically based on trigger conditions.

Potential triggers include the occurrence of events from changes in a flow run’s state—or the absence of such events. You can even define your own custom trigger to fire based on an event created from a webhook or a custom event defined in Python code.

Actions include starting flow runs, pausing schedules, and sending custom notifications.

Prerequisites

Create an automation

On the Automations page, select the + icon to create a new automation. You’ll be prompted to configure:

  • A trigger condition that causes the automation to execute.
  • One or more actions carried out by the automation.
  • Details about the automation, such as a name and description.

Manage automations

The Automations page provides an overview of all configured automations for your workspace.

Viewing automations for a workspace in Prefect Cloud.

Select the toggle next to an automation to pause execution of the automation.

The button next to the toggle provides commands to copy the automation ID, edit the automation, or delete the automation.

Select the name of an automation to view Details about it and relevant Events.

Triggers

Triggers specify the conditions under which your action should be performed. The Prefect UI includes templates for many common conditions, such as:

  • Flow run state change (Flow Run Tags are only evaluated with OR criteria)
  • Work pool status
  • Work queue status
  • Deployment status
  • Metric thresholds, such as average duration, lateness, or completion percentage
  • Incident declarations (available on Pro and Enterprise plans)
  • Custom event triggers

Automations API

The automations API enables further programmatic customization of trigger and action policies based on arbitrary events.

Importantly, you can configure the triggers not only in reaction to events, but also proactively: in the absence of an expected event.

Configuring a trigger for an automation in Prefect Cloud.

For example, in the case of flow run state change triggers, you might expect production flows to finish in no longer than thirty minutes. But transient infrastructure or network issues could cause your flow to get “stuck” in a running state. A trigger could kick off an action if the flow stays in a running state for more than 30 minutes.

This action could be taken on the flow itself, such as cancelling or restarting it. Or the action could take the form of a notification for someone to take manual remediation steps. Or you could set both actions to take place when the trigger occurs.

Actions

Actions specify what your automation does when its trigger criteria are met. Current action types include:

  • Cancel a flow run
  • Pause or resume a schedule
  • Run a deployment
  • Pause or resume a deployment schedule
  • Pause or resume a work pool
  • Pause or resume a work queue
  • Pause or resume an automation
  • Send a notification
  • Call a webhook
  • Suspend a flow run
  • Declare an incident (available on Pro and Enterprise plans)
  • Change the state of a flow run

Configuring an action for an automation in Prefect Cloud.

Create automations In Python code

You can create and access any automation with the Python SDK’s Automation class and its methods.

from prefect.automations import Automation
from prefect.events.schemas.automations import EventTrigger
from prefect.server.events.actions import CancelFlowRun

# creating an automation
automation =
  Automation(
    name="woodchonk",
    trigger=EventTrigger(
      expect={"animal.walked"},
      match={
        "genus": "Marmota",
        "species": "monax",
        },
        posture="Reactive",
        threshold=3,
        ),
      actions=[CancelFlowRun()]
        ).create()
print(automation)
# name='woodchonk' description='' enabled=True trigger=EventTrigger(type='event', match=ResourceSpecification(__root__={'genus': 'Marmota', 'species': 'monax'}), match_related=ResourceSpecification(__root__={}), after=set(), expect={'animal.walked'}, for_each=set(), posture=Posture.Reactive, threshold=3, within=datetime.timedelta(seconds=10)) actions=[CancelFlowRun(type='cancel-flow-run')] actions_on_trigger=[] actions_on_resolve=[] owner_resource=None id=UUID('d641c552-775c-4dc6-a31e-541cb11137a6')

# reading the automation

automation = Automation.read(id ="d641c552-775c-4dc6-a31e-541cb11137a6")
or
automation = Automation.read("woodchonk")

print(automation)
# name='woodchonk' description='' enabled=True trigger=EventTrigger(type='event', match=ResourceSpecification(__root__={'genus': 'Marmota', 'species': 'monax'}), match_related=ResourceSpecification(__root__={}), after=set(), expect={'animal.walked'}, for_each=set(), posture=Posture.Reactive, threshold=3, within=datetime.timedelta(seconds=10)) actions=[CancelFlowRun(type='cancel-flow-run')] actions_on_trigger=[] actions_on_resolve=[] owner_resource=None id=UUID('d641c552-775c-4dc6-a31e-541cb11137a6')

Selected and inferred action targets

Some actions require you to either select the target of the action, or specify that the target of the action should be inferred. Selected targets are simple and useful for when you know exactly what object your action should act on. For example, the case of a cleanup flow you want to run or a specific notification you want to send.

Inferred targets are deduced from the trigger itself.

For example, if a trigger fires on a flow run that is stuck in a running state, and the action is to cancel an inferred flow run—the flow run that caused the trigger to fire.

Similarly, if a trigger fires on a work queue event and the corresponding action is to pause an inferred work queue, the inferred work queue is the one that emitted the event.

Prefect infers the relevant event whenever possible, but sometimes one does not exist.

Specify a name and, optionally, a description for the automation.

Create an automation with deployment triggers

To enable the simple configuration of event-driven deployments, Prefect provides deployment triggers—a shorthand for creating automations that are linked to specific deployments to run them based on the presence or absence of events.

Trigger definitions for deployments are supported in prefect.yaml, .serve, and .deploy. At deployment time, specified trigger definitions create linked automations triggered by events matching your chosen grammar. Each trigger definition may include a jinja template to render the triggering event as the parameters of your deployment’s flow run.

Define triggers in prefect.yaml

You can include a list of triggers on any deployment in a prefect.yaml file:

deployments:
  - name: my-deployment
    entrypoint: path/to/flow.py:decorated_fn
    work_pool:
      name: my-work-pool
    triggers:
      - type: event
        enabled: true
        match:
          prefect.resource.id: my.external.resource
        expect:
          - external.resource.pinged
        parameters:
          param_1: "{{ event }}"

This deployment creates a flow run when an external.resource.pinged event and an external.resource.replied event have been seen from my.external.resource:

deployments:
  - name: my-deployment
    entrypoint: path/to/flow.py:decorated_fn
    work_pool:
      name: my-work-pool
    triggers:
      - type: compound
        require: all
        parameters:
          param_1: "{{ event }}"
        triggers:
          - type: event
            match:
              prefect.resource.id: my.external.resource
            expect:
              - external.resource.pinged
          - type: event
            match:
              prefect.resource.id: my.external.resource
            expect:
              - external.resource.replied

Define triggers in .serve and .deploy

To create deployments with triggers in Python, the trigger types DeploymentEventTrigger, DeploymentMetricTrigger, DeploymentCompoundTrigger, and DeploymentSequenceTrigger can be imported from prefect.events:

from prefect import flow
from prefect.events import DeploymentEventTrigger


@flow(log_prints=True)
def decorated_fn(param_1: str):
    print(param_1)


if __name__=="__main__":
    decorated_fn.serve(
        name="my-deployment",
        triggers=[
            DeploymentEventTrigger(
                enabled=True,
                match={"prefect.resource.id": "my.external.resource"},
                expect=["external.resource.pinged"],
                parameters={
                    "param_1": "{{ event }}",
                },
            )
        ],
    )

As with prior examples, you must supply composite triggers with a list of underlying triggers:

from prefect import flow
from prefect.events import DeploymentCompoundTrigger


@flow(log_prints=True)
def decorated_fn(param_1: str):
    print(param_1)


if __name__=="__main__":
    decorated_fn.deploy(
        name="my-deployment",
        image="my-image-registry/my-image:my-tag"
        triggers=[
            DeploymentCompoundTrigger(
                enabled=True,
                name="my-compound-trigger",
                require="all",
                triggers=[
                    {
                      "type": "event",
                      "match": {"prefect.resource.id": "my.external.resource"},
                      "expect": ["external.resource.pinged"],
                    },
                    {
                      "type": "event",
                      "match": {"prefect.resource.id": "my.external.resource"},
                      "expect": ["external.resource.replied"],
                    },
                ],
                parameters={
                    "param_1": "{{ event }}",
                },
            )
        ],
        work_pool_name="my-work-pool",
    )

Pass triggers to prefect deploy

You can pass one or more --trigger arguments to prefect deploy as either a JSON string or a path to a .yaml or .json file.

# Pass a trigger as a JSON string
prefect deploy -n test-deployment \
  --trigger '{
    "enabled": true,
    "match": {
      "prefect.resource.id": "prefect.flow-run.*"
    },
    "expect": ["prefect.flow-run.Completed"]
  }'

# Pass a trigger using a JSON/YAML file
prefect deploy -n test-deployment --trigger triggers.yaml
prefect deploy -n test-deployment --trigger my_stuff/triggers.json

For example, a triggers.yaml file could have many triggers defined:

triggers:
  - enabled: true
    match:
      prefect.resource.id: my.external.resource
    expect:
      - external.resource.pinged
    parameters:
      param_1: "{{ event }}"
  - enabled: true
    match:
      prefect.resource.id: my.other.external.resource
    expect:
      - some.other.event
    parameters:
      param_1: "{{ event }}"

Both of the above triggers would be attached to test-deployment after running prefect deploy.

Triggers passed to prefect deploy will override any triggers defined in prefect.yaml

While you can define triggers in prefect.yaml for a given deployment, triggers passed to prefect deploy take precedence over those defined in prefect.yaml.

Note that deployment triggers contribute to the total number of automations in your workspace.

Sending notifications with automations

Automations support sending notifications through any predefined block that is capable of and configured to send a message, including:

  • Slack message to a channel
  • Microsoft Teams message to a channel
  • Email to an email address

Configuring notifications for an automation in Prefect Cloud.

Templating with Jinja

You can access templated variables with automation actions through Jinja syntax. Templated variables enable you to dynamically include details from an automation trigger, such as a flow or pool name.

Jinja templated variable syntax wraps the variable name in double curly brackets, like this: {{ variable }}.

You can access properties of the underlying flow run objects including:

In addition to its native properties, each object includes an id along with created and updated timestamps.

The flow_run|ui_url token returns the URL to view the flow run in Prefect Cloud.

Here’s an example for something relevant to a flow run state-based notification:

Flow run {{ flow_run.name }} entered state {{ flow_run.state.name }}.

    Timestamp: {{ flow_run.state.timestamp }}
    Flow ID: {{ flow_run.flow_id }}
    Flow Run ID: {{ flow_run.id }}
    State message: {{ flow_run.state.message }}

The resulting Slack webhook notification looks something like this:

Configuring notifications for an automation in Prefect Cloud.

You could include flow and deployment properties:

Flow run {{ flow_run.name }} for flow {{ flow.name }}
entered state {{ flow_run.state.name }}
with message {{ flow_run.state.message }}

Flow tags: {{ flow_run.tags }}
Deployment name: {{ deployment.name }}
Deployment version: {{ deployment.version }}
Deployment parameters: {{ deployment.parameters }}

An automation that reports on work pool status might include notifications using work_pool properties:

Work pool status alert!

Name: {{ work_pool.name }}
Last polled: {{ work_pool.last_polled }}

In addition to those shortcuts for flows, deployments, and work pools, you have access to the automation and the event that triggered the automation. See the Automations API for additional details.

Automation: {{ automation.name }}
Description: {{ automation.description }}

Event: {{ event.id }}
Resource:
{% for label, value in event.resource %}
{{ label }}: {{ value }}
{% endfor %}
Related Resources:
{% for related in event.related %}
    Role: {{ related.role }}
    {% for label, value in related %}
    {{ label }}: {{ value }}
    {% endfor %}
{% endfor %}

Note that this example also illustrates the ability to use Jinja features such as iterator and for loop control structures when templating notifications.

API example

This example grabs data from an API and sends a notification based on the end state.

Create the example script

Start by pulling hypothetical user data from an endpoint and then performing data cleaning and transformations.

First create a simple extract method that pulls the data from a random user data generator endpoint:

from prefect import flow, task, get_run_logger
import requests
import json

@task
def fetch(url: str):
    logger = get_run_logger()
    response = requests.get(url)
    raw_data = response.json()
    logger.info(f"Raw response: {raw_data}")
    return raw_data

@task
def clean(raw_data: dict):
    print(raw_data.get('results')[0])
    results = raw_data.get('results')[0]
    logger = get_run_logger()
    logger.info(f"Cleaned results: {results}")
    return results['name']

@flow
def build_names(num: int = 10):
    df = []
    url = "https://randomuser.me/api/"
    logger = get_run_logger()
    copy = num
    while num != 0:
        raw_data = fetch(url)
        df.append(clean(raw_data))
        num -= 1
    logger.info(f"Built {copy} names: {df}")
    return df

if __name__ == "__main__":
    list_of_names = build_names()

The data cleaning workflow has visibility into each step, and sends a list of names to the next step of the pipeline.

Create a notification block in the UI

Next, send a notification based off a completed state outcome. Configure a notification that shows when to look into your workflow logic.

  1. Prior to creating the automation, confirm the notification location. Create a notification block to help define where the notification is sent. List of available blocks

  2. Navigate to the blocks page on the UI, and click into creating an email notification block. Creating a notification block in the Cloud UI

  3. Go to the automations page to create your first automation. Automations page

  4. Next, find the trigger type. In this case, use a flow completion. Trigger type

  5. Create the actions for when the trigger is hit. In this case, create a notification to showcase the completion. Notification block in automation

  6. Now the automation is ready to be triggered from a flow run completion. Run the file locally and see that the notification is sent to your inbox after the completion. It may take a few minutes for the notification to arrive. Final notification

No deployment created

You do not need to create a deployment to trigger your automation. In the case above, the flow run state trigger fired in response to a flow run that was executed locally.

Now that you’ve seen how to create an email notification from a flow run completion, see how to kick off a deployment run in response to an event.

Event-based deployment automation

Create an automation to kick off a deployment instead of a notification. Explore how to programmatically create this automation with Prefect’s REST API.

See the REST API documentation as a reference for interacting with the Prefect Cloud automation endpoints.

Create a deployment to kick off some work based on how long a flow is running. For example, if the build_names flow takes too long to execute, you can kick off a deployment with the same build_names flow, but replace the count value with a lower number to speed up completion. Create a deployment with a prefect.yaml file or a Python file that uses flow.deploy.

Create a prefect.yaml file like this one for our flow build_names:

  # Welcome to your prefect.yaml file! You can use this file for storing and managing
  # configuration for deploying your flows. We recommend committing this file to source
  # control along with your flow code.

  # Generic metadata about this project
  name: automations-guide
  prefect-version: 2.13.1

  # build section allows you to manage and build docker images
  build: null

  # push section allows you to manage if and how this project is uploaded to remote locations
  push: null

  # pull section allows you to provide instructions for cloning this project in remote locations
  pull:
  - prefect.deployments.steps.set_working_directory:
      directory: /Users/src/prefect/Playground/automations-guide

  # the deployments section allows you to provide configuration for deploying flows
  deployments:
  - name: deploy-build-names
    version: null
    tags: []
    description: null
    entrypoint: test-automations.py:build_names
    parameters: {}
    work_pool:
      name: tutorial-process-pool
      work_queue_name: null
      job_variables: {}
    schedule: null

Grab your deployment_id from this deployment with the CLI and embed it in your automation.

Find deployment_id from the CLI

Run prefect deployment ls in an authenticated command prompt.

prefect deployment ls
                                          Deployments                                           
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
┃ Name                                                  ┃ ID                                   ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┩
│ Extract islands/island-schedule                       │ d9d7289c-7a41-436d-8313-80a044e61532 │
│ build-names/deploy-build-names                        │ 8b10a65e-89ef-4c19-9065-eec5c50242f4 │
│ ride-duration-prediction-backfill/backfill-deployment │ 76dc6581-1773-45c5-a291-7f864d064c57 │
└───────────────────────────────────────────────────────┴──────────────────────────────────────┘

Create an automation with a POST call to programmatically create the automation. Ensure you have your api_key, account_id, and workspace_id.

def create_event_driven_automation():
    api_url = f"https://api.prefect.cloud/api/accounts/{account_id}/workspaces/{workspace_id}/automations/"
    data = {
        "name": "Event Driven Redeploy",
        "description": "Programmatically created an automation to redeploy a flow based on an event",
        "enabled": "true",
        "trigger": {
            "after": [
                "string"
            ],
            "expect": [
                "prefect.flow-run.Running"
            ],
            "for_each": [
                "prefect.resource.id"
            ],
            "posture": "Proactive",
            "threshold": 30,
            "within": 0
        },
        "actions": [
            {
                "type": "run-deployment",
                "source": "selected",
                "deployment_id": "YOUR-DEPLOYMENT-ID", 
                "parameters": "10"
            }
        ],
        "owner_resource": "string"
    }
    
    headers = {"Authorization": f"Bearer {PREFECT_API_KEY}"}
    response = requests.post(api_url, headers=headers, json=data)
    
    print(response.json())
    return response.json()

After running this function, you will see the changes that came from the post request within the UI. Keep in mind the context is “custom” on UI.

Run the underlying flow and see the deployment kick off after 30 seconds. This results in a new flow run of build_names. You can see this new deployment get initiated with the custom parameters outlined above.

In a few quick changes, you can programmatically create an automation that deploys workflows with custom parameters.

Use an underlying .yaml file

You can take this a step further by using your own .yaml version of the automation, and registering that file with the UI. This simplifies the requirements of the automation by declaring it in its own .yaml file, and then registering that .yaml with the API.

First start with creating the .yaml file to house the automation requirements:

automation.yaml
name: Cancel long running flows
description: Cancel any flow run after an hour of execution
trigger:
  match:
    "prefect.resource.id": "prefect.flow-run.*"
  match_related: {}
  after:
    - "prefect.flow-run.Failed"
  expect:
    - "prefect.flow-run.*"
  for_each:
    - "prefect.resource.id"
  posture: "Proactive"
  threshold: 1
  within: 30
actions:
  - type: "cancel-flow-run"

Make a helper function that applies this YAML file with the REST API function.

import yaml

from utils import post, put

def create_or_update_automation(path: str = "automation.yaml"):
    """Create or update an automation from a local YAML file"""
    # Load the definition
    with open(path, "r") as fh:
        payload = yaml.safe_load(fh)

    # Find existing automations by name
    automations = post("/automations/filter")
    existing_automation = [a["id"] for a in automations if a["name"] == payload["name"]]
    automation_exists = len(existing_automation) > 0

    # Create or update the automation
    if automation_exists:
        print(f"Automation '{payload['name']}' already exists and will be updated")
        put(f"/automations/{existing_automation[0]}", payload=payload)
    else:
        print(f"Creating automation '{payload['name']}'")
        post("/automations/", payload=payload)

if __name__ == "__main__":
    create_or_update_automation()

Find a complete repo with these APIs examples in this GitHub repository.

In this example, you created the automation by registering the .yaml file with a helper function.

Kick off an automation with a custom webhook

Use webhooks to expose the events API. This allows you to extend the functionality of deployments and respond to changes in your workflow.

By exposing a webhook endpoint, you can kick off workflows that trigger deployments, all from an event created from an HTTP request.

Create this webhook in the UI to create these dynamic events.

{
    "event": "model-update",
    "resource": {
        "prefect.resource.id": "product.models.{{ body.model_id}}",
        "prefect.resource.name": "{{ body.friendly_name }}",
        "run_count": "{{body.run_count}}"
    }
}

From this input, you can create an exposed webhook endpoint.

webhook-simple

Each webhook corresponds to a custom event created where you can react to it downstream with a separate deployment or automation.

For example, you can create a curl request that sends the endpoint information such as a run count for your deployment:

curl -X POST https://api.prefect.cloud/hooks/34iV2SFke3mVa6y5Y-YUoA -d "model_id=adhoc" -d "run_count=10" -d "friendly_name=test-user-input"

From here, you can make a webhook that is connected to pulling in parameters on the curl command. It kicks off a deployment that uses these pulled parameters: Webhook created

Go into the event feed to automate straight from this event: Webhook automate

This allows you to create automations that respond to these webhook events. From a few clicks in the UI, you can associate an external process with the Prefect events API that can trigger downstream deployments. Automation custom

Use triggers

Explore event triggers that automate the kickoff of a deployment run.

You can create a deployment that triggers when a flow run takes longer than expected.

Use the Marvin library

Take advantage of Prefect’s Marvin library that uses an LLM to classify the data. You can use Marvin’s AI functions to make the dataset more information rich.

Install Marvin with pip install marvin and set your OpenAI API key.

You can add a trigger to run a deployment in response to a specific event.

Here’s an example with Marvin’s AI functions. It will take in a Pandas DataFrame and use the AI function to analyze it.

Here is an example of pulling in that data and classifying using Marvin AI. The dummy data is based on classifications you have already created.

from marvin import ai_classifier
from enum import Enum
import pandas as pd

@ai_fn
def generate_synthetic_user_data(build_of_names: list[dict]) -> list:
    """
    Generate additional data for userID (numerical values with 6 digits), location, and timestamp as separate columns and 
    append the data onto 'build_of_names'. Make userID the first column
    """

@flow
def create_fake_user_dataset(df):
  artifact_df = generate_synthetic_user_data(df)
  print(artifact_df)
  
  create_table_artifact(
      key="fake-user-data",
      table=artifact_df,
      description= "Dataset that is comprised of a mix of autogenerated data based on user data"
  )

if __name__ == "__main__":
    create_fake_artifact()  

Kick off a deployment with a trigger defined in a prefect.yaml file. Specify what to trigger when the event stays in a running state for longer than 30 seconds.

# Welcome to your prefect.yaml file! You can use this file for storing and managing
# configuration for deploying your flows. We recommend committing this file to source
# control along with your flow code.

# Generic metadata about this project
name: automations-guide
prefect-version: 2.13.1

# build section allows you to manage and build docker images
build: null

# push section allows you to manage if and how this project is uploaded to remote locations
push: null

# pull section allows you to provide instructions for cloning this project in remote locations
pull:
- prefect.deployments.steps.set_working_directory:
    directory: /Users/src/prefect/Playground/marvin-extension

# the deployments section allows you to provide configuration for deploying flows
deployments:
- name: create-fake-user-dataset
  triggers:
    - enabled: true
      match:
        prefect.resource.id: "prefect.flow-run.*"
      after: ["prefect.flow-run.Running"]
      expect: []
      for_each: ["prefect.resource.id"]
      parameters:
        param_1: 10
      posture: "Proactive"
  version: null
  tags: []
  description: null
  entrypoint: marvin-extension.py:create_fake_user_dataset
  parameters: {}
  work_pool:
    name: tutorial-process-pool
    work_queue_name: null
    job_variables: {}
  schedule: null

See also

  • To learn more about Prefect events, which can trigger automations, see the events docs.
  • See the webhooks guide to learn how to create webhooks and receive external events.