The PrefectClient contains many methods that make it simpler to perform actions, such as:

  • reschedule late flow runs
  • get the last N completed flow runs from a workspace

The PrefectClient is an async context manager. Here’s an example usage:

from prefect import get_client


async with get_client() as client:
    response = await client.hello()
    print(response.json()) # 👋

Examples

Reschedule late flow runs

To bulk reschedule flow runs that are late, delete the late flow runs and create new ones in a Scheduled state with a delay. This is useful if you accidentally scheduled many flow runs of a deployment to an inactive work pool, for example.

The following example reschedules the last three late flow runs of a deployment named healthcheck-storage-test to run six hours later than their original expected start time. It also deletes any remaining late flow runs of that deployment.

import asyncio
from datetime import datetime, timedelta, timezone
from typing import Optional


from prefect import get_client
from prefect.client.schemas.filters import (
    DeploymentFilter, FlowRunFilter
)
from prefect.client.schemas.objects import FlowRun
from prefect.client.schemas.sorting import FlowRunSort
from prefect.states import Scheduled

async def reschedule_late_flow_runs(
    deployment_name: str,
    delay: timedelta,
    most_recent_n: int,
    delete_remaining: bool = True,
    states: Optional[list[str]] = None
) -> list[FlowRun]:
    if not states:
        states = ["Late"]

    async with get_client() as client:
        flow_runs = await client.read_flow_runs(
            flow_run_filter=FlowRunFilter(
                state=dict(name=dict(any_=states)),
                expected_start_time=dict(
                    before_=datetime.now(timezone.utc)
                ),
            ),
            deployment_filter=DeploymentFilter(
                name={'like_': deployment_name}
            ),
            sort=FlowRunSort.START_TIME_DESC,
            limit=most_recent_n if not delete_remaining else None
        )

        if not flow_runs:
            print(f"No flow runs found in states: {states!r}")
            return []
        
        rescheduled_flow_runs = []
        for i, run in enumerate(flow_runs):
            await client.delete_flow_run(flow_run_id=run.id)
            if i < most_recent_n:
                new_run = await client.create_flow_run_from_deployment(
                    deployment_id=run.deployment_id,
                    state=Scheduled(
                        scheduled_time=run.expected_start_time + delay
                    ),
                )
                rescheduled_flow_runs.append(new_run)
            
        return rescheduled_flow_runs


if __name__ == "__main__":
    rescheduled_flow_runs = asyncio.run(
        reschedule_late_flow_runs(
            deployment_name="healthcheck-storage-test",
            delay=timedelta(hours=6),
            most_recent_n=3,
        )
    )
    
    print(f"Rescheduled {len(rescheduled_flow_runs)} flow runs")
        
    assert all(
        run.state.is_scheduled() for run in rescheduled_flow_runs
    )
    assert all(
        run.expected_start_time > datetime.now(timezone.utc)
        for run in rescheduled_flow_runs
    )

Get the last N completed flow runs from your workspace

To get the last N completed flow runs from your workspace, use read_flow_runs and prefect.client.schemas.

This example gets the last three completed flow runs from your workspace:

import asyncio
from typing import Optional

from prefect import get_client
from prefect.client.schemas.filters import FlowRunFilter
from prefect.client.schemas.objects import FlowRun
from prefect.client.schemas.sorting import FlowRunSort


async def get_most_recent_flow_runs(
    n: int = 3,
    states: Optional[list[str]] = None
) -> list[FlowRun]:
    if not states:
        states = ["COMPLETED"]
    
    async with get_client() as client:
        return await client.read_flow_runs(
            flow_run_filter=FlowRunFilter(
                state={'type': {'any_': states}}
            ),
            sort=FlowRunSort.END_TIME_DESC,
            limit=n,
        )


if __name__ == "__main__":
    last_3_flow_runs: list[FlowRun] = asyncio.run(
        get_most_recent_flow_runs()
    )
    print(last_3_flow_runs)
    
    assert all(
        run.state.is_completed() for run in last_3_flow_runs
    )
    assert (
        end_times := [run.end_time for run in last_3_flow_runs]
    ) == sorted(end_times, reverse=True)

Instead of the last three from the whole workspace, you can also use the DeploymentFilter to get the last three completed flow runs of a specific deployment.

Transition all running flows to cancelled through the Client

Use get_clientto set multiple runs to a Cancelled state. The code below cancels all flow runs that are in Pending, Running, Scheduled, or Late states when the script is run.

import anyio


from prefect import get_client
from prefect.client.schemas.filters import FlowRunFilter, FlowRunFilterState, FlowRunFilterStateName
from prefect.client.schemas.objects import StateType

async def list_flow_runs_with_states(states: list[str]):
    async with get_client() as client:
        flow_runs = await client.read_flow_runs(
            flow_run_filter=FlowRunFilter(
                state=FlowRunFilterState(
                    name=FlowRunFilterStateName(any_=states)
                )
            )
        )
    return flow_runs


async def cancel_flow_runs(flow_runs):
    async with get_client() as client:
        for idx, flow_run in enumerate(flow_runs):
            print(f"[{idx + 1}] Cancelling flow run '{flow_run.name}' with ID '{flow_run.id}'")
            state_updates = {}
            state_updates.setdefault("name", "Cancelled")
            state_updates.setdefault("type", StateType.CANCELLED)
            state = flow_run.state.copy(update=state_updates)
            await client.set_flow_run_state(flow_run.id, state, force=True)


async def bulk_cancel_flow_runs():
    states = ["Pending", "Running", "Scheduled", "Late"]
    flow_runs = await list_flow_runs_with_states(states)

    while len(flow_runs) > 0:
        print(f"Cancelling {len(flow_runs)} flow runs\n")
        await cancel_flow_runs(flow_runs)
        flow_runs = await list_flow_runs_with_states(states)
    print("Done!")


if __name__ == "__main__":
    anyio.run(bulk_cancel_flow_runs)