Overview

The PrefectClient offers methods to simplify common operations against Prefect’s REST API that may not be abstracted away by the SDK.

For example, to reschedule flow runs, one might use methods like:

  • read_flow_runs with a FlowRunFilter to read certain flow runs
  • create_flow_run_from_deployment to schedule new flow runs
  • delete_flow_run to delete a very Late flow run

Getting a client

By default, get_client() returns an asynchronous client to be used as a context manager, but you may also use a synchronous client.

from prefect import get_client

async with get_client() as client:
    response = await client.hello()
    print(response.json()) # 👋

Configure custom headers

You can configure custom HTTP headers to be sent with every API request by setting the PREFECT_CLIENT_CUSTOM_HEADERS setting. This is useful for adding authentication headers, API keys, or other custom headers required by proxies, CDNs, or security systems.

Setting custom headers

Custom headers can be configured via environment variables or settings. The headers are specified as key-value pairs in JSON format.

export PREFECT_CLIENT_CUSTOM_HEADERS='{"CF-Access-Client-Id": "your-client-id", "CF-Access-Client-Secret": "your-secret"}'

Protected headers

Certain headers are protected and cannot be overridden by custom headers for security reasons:

  • User-Agent - Managed by Prefect to identify client version
  • Prefect-Csrf-Token - Used for CSRF protection
  • Prefect-Csrf-Client - Used for CSRF protection

If you attempt to override these headers, Prefect will log a warning and ignore the custom header value.

Examples

These examples are meant to illustrate how one might develop their own utilities for interacting with the API.

If you believe a client method is missing, or you’d like to see a specific pattern better represented in the SDK generally, please open an issue.

Reschedule late flow runs

To bulk reschedule flow runs that are late, delete the late flow runs and create new ones in a Scheduled state with a delay. This is useful if you accidentally scheduled many flow runs of a deployment to an inactive work pool, for example.

The following example reschedules the last three late flow runs of a deployment named healthcheck-storage-test to run six hours later than their original expected start time. It also deletes any remaining late flow runs of that deployment.

First, define the rescheduling function:

async def reschedule_late_flow_runs(
    deployment_name: str,
    delay: timedelta,
    most_recent_n: int,
    delete_remaining: bool = True,
    states: list[str] | None = None
) -> list[FlowRun]:
    states = states or ["Late"]

    async with get_client() as client:
        flow_runs = await client.read_flow_runs(
            flow_run_filter=FlowRunFilter(
                state=dict(name=dict(any_=states)),
                expected_start_time=dict(before_=datetime.now(timezone.utc)),
            ),
            deployment_filter=DeploymentFilter(name={'like_': deployment_name}),
            sort=FlowRunSort.START_TIME_DESC,
            limit=most_recent_n if not delete_remaining else None
        )

        rescheduled_flow_runs: list[FlowRun] = []
        for i, run in enumerate(flow_runs):
            await client.delete_flow_run(flow_run_id=run.id)
            if i < most_recent_n:
                new_run = await client.create_flow_run_from_deployment(
                    deployment_id=run.deployment_id,
                    state=Scheduled(scheduled_time=run.expected_start_time + delay),
                )
                rescheduled_flow_runs.append(new_run)
            
        return rescheduled_flow_runs

Then use it to reschedule flows:

rescheduled_flow_runs = asyncio.run(
    reschedule_late_flow_runs(
        deployment_name="healthcheck-storage-test",
        delay=timedelta(hours=6),
        most_recent_n=3,
    )
)

Get the last N completed flow runs from your workspace

To get the last N completed flow runs from your workspace, use read_flow_runs and prefect.client.schemas.

This example gets the last three completed flow runs from your workspace:

async def get_most_recent_flow_runs(
    n: int,
    states: list[str] | None = None
) -> list[FlowRun]:    
    async with get_client() as client:
        return await client.read_flow_runs(
            flow_run_filter=FlowRunFilter(
                state={'type': {'any_': states or ["COMPLETED"]}}
            ),
            sort=FlowRunSort.END_TIME_DESC,
            limit=n,
        )

Use it to get the last 3 completed runs:

flow_runs: list[FlowRun] = asyncio.run(
    get_most_recent_flow_runs(n=3)
)

Instead of the last three from the whole workspace, you can also use the DeploymentFilter to get the last three completed flow runs of a specific deployment.

Transition all running flows to cancelled through the Client

Use get_clientto set multiple runs to a Cancelled state. This example cancels all flow runs that are in Pending, Running, Scheduled, or Late states when the script is run.

async def list_flow_runs_with_states(states: list[str]) -> list[FlowRun]:
    async with get_client() as client:
        return await client.read_flow_runs(
            flow_run_filter=FlowRunFilter(
                state=FlowRunFilterState(
                    name=FlowRunFilterStateName(any_=states)
                )
            )
        )

async def cancel_flow_runs(flow_runs: list[FlowRun]):
    async with get_client() as client:
        for flow_run in flow_runs:
            state = flow_run.state.copy(
                update={"name": "Cancelled", "type": StateType.CANCELLED}
            )
            await client.set_flow_run_state(flow_run.id, state, force=True)

Cancel all pending, running, scheduled or late flows:

async def bulk_cancel_flow_runs():
    states = ["Pending", "Running", "Scheduled", "Late"]
    flow_runs = await list_flow_runs_with_states(states)

    while flow_runs:
        print(f"Cancelling {len(flow_runs)} flow runs")
        await cancel_flow_runs(flow_runs)
        flow_runs = await list_flow_runs_with_states(states)

asyncio.run(bulk_cancel_flow_runs())

Create, read, or delete artifacts

Create, read, or delete artifacts programmatically through the Prefect REST API. With the Artifacts API, you can automate the creation and management of artifacts as part of your workflow.

For example, to read the five most recently created Markdown, table, and link artifacts, you can run the following:

fixture:mock_post_200
import requests


PREFECT_API_URL="https://api.prefect.cloud/api/accounts/abc/workspaces/xyz"
PREFECT_API_KEY="pnu_ghijk"
data = {
    "sort": "CREATED_DESC",
    "limit": 5,
    "artifacts": {
        "key": {
            "exists_": True
        }
    }
}

headers = {"Authorization": f"Bearer {PREFECT_API_KEY}"}
endpoint = f"{PREFECT_API_URL}/artifacts/filter"

response = requests.post(endpoint, headers=headers, json=data)
assert response.status_code == 200
for artifact in response.json():
    print(artifact)

If you don’t specify a key or that a key must exist, you will also return results, which are a type of key-less artifact.

See the Prefect REST API documentation on artifacts for more information.