prefect.workers.base

Classes

BaseJobConfiguration

Methods:

from_template_and_values

from_template_and_values(cls, base_job_template: dict[str, Any], values: dict[str, Any], client: 'PrefectClient | None' = None)
Creates a valid worker configuration object from the provided base configuration and overrides. Important: this method expects that the base_job_template was already validated server-side.

is_using_a_runner

is_using_a_runner(self) -> bool

json_template

json_template(cls) -> dict[str, Any]
Returns a dict with job configuration as keys and the corresponding templates as values Defaults to using the job configuration parameter name as the template variable name. e.g.
{
    key1: '{{ key1 }}',     # default variable template
    key2: '{{ template2 }}', # `template2` specifically provide as template
}

prepare_for_flow_run

prepare_for_flow_run(self, flow_run: 'FlowRun', deployment: 'DeploymentResponse | None' = None, flow: 'APIFlow | None' = None, work_pool: 'WorkPool | None' = None, worker_name: str | None = None) -> None
Prepare the job configuration for a flow run. This method is called by the worker before starting a flow run. It should be used to set any configuration values that are dependent on the flow run. Args:
  • flow_run: The flow run to be executed.
  • deployment: The deployment that the flow run is associated with.
  • flow: The flow that the flow run is associated with.
  • work_pool: The work pool that the flow run is running in.
  • worker_name: The name of the worker that is submitting the flow run.

BaseVariables

Methods:

model_json_schema

model_json_schema(cls, by_alias: bool = True, ref_template: str = '#/definitions/{model}', schema_generator: Type[GenerateJsonSchema] = GenerateJsonSchema, mode: Literal['validation', 'serialization'] = 'validation') -> dict[str, Any]
TODO: stop overriding this method - use GenerateSchema in ConfigDict instead?

BaseWorkerResult

BaseWorker

Methods:

client

client(self) -> PrefectClient

get_all_available_worker_types

get_all_available_worker_types() -> list[str]
Returns all worker types available in the local registry.

get_and_submit_flow_runs

get_and_submit_flow_runs(self) -> list['FlowRun']

get_default_base_job_template

get_default_base_job_template(cls) -> dict[str, Any]

get_description

get_description(cls) -> str

get_documentation_url

get_documentation_url(cls) -> str

get_flow_run_logger

get_flow_run_logger(self, flow_run: 'FlowRun') -> PrefectLogAdapter

get_logo_url

get_logo_url(cls) -> str

get_name_slug

get_name_slug(self) -> str

get_status

get_status(self) -> dict[str, Any]
Retrieves the status of the current worker including its name, current worker pool, the work pool queues it is polling, and its local settings.

get_worker_class_from_type

get_worker_class_from_type(type: str) -> Optional[Type['BaseWorker[Any, Any, Any]']]
Returns the worker class for a given worker type. If the worker type is not recognized, returns None.

is_worker_still_polling

is_worker_still_polling(self, query_interval_seconds: float) -> bool
This method is invoked by a webserver healthcheck handler and returns a boolean indicating if the worker has recorded a scheduled flow run poll within a variable amount of time. The query_interval_seconds is the same value that is used by the loop services - we will evaluate if the _last_polled_time was within that interval x 30 (so 10s -> 5m) The instance property self._last_polled_time is currently set/updated in get_and_submit_flow_runs()

limiter

limiter(self) -> anyio.CapacityLimiter

run

run(self, flow_run: 'FlowRun', configuration: C, task_status: Optional[anyio.abc.TaskStatus[int]] = None) -> R
Runs a given flow run on the current worker.

setup

setup(self) -> None
Prepares the worker to run.

start

start(self, run_once: bool = False, with_healthcheck: bool = False, printer: Callable[..., None] = print) -> None
Starts the worker and runs the main worker loops. By default, the worker will run loops to poll for scheduled/cancelled flow runs and sync with the Prefect API server. If run_once is set, the worker will only run each loop once and then return. If with_healthcheck is set, the worker will start a healthcheck server which can be used to determine if the worker is still polling for flow runs and restart the worker if necessary. Args:
  • run_once: If set, the worker will only run each loop once then return.
  • with_healthcheck: If set, the worker will start a healthcheck server.
  • printer: A print-like function where logs will be reported.

submit

submit(self, flow: 'Flow[..., FR]', parameters: dict[str, Any] | None = None, job_variables: dict[str, Any] | None = None) -> 'PrefectFlowRunFuture[FR]'
EXPERIMENTAL: The interface for this method is subject to change. Submits a flow to run via the worker. Args:
  • flow: The flow to submit
  • parameters: The parameters to pass to the flow
Returns:
  • A flow run object

sync_with_backend

sync_with_backend(self) -> None
Updates the worker’s local information about it’s current work pool and queues. Sends a worker heartbeat to the API.

teardown

teardown(self, *exc_info: Any) -> None
Cleans up resources after the worker is stopped.

work_pool

work_pool(self) -> WorkPool