prefect.workers.base

Classes

BaseJobConfiguration

Methods:

is_using_a_runner

is_using_a_runner(self) -> bool

json_template

json_template(cls) -> dict[str, Any]

Returns a dict with job configuration as keys and the corresponding templates as values

Defaults to using the job configuration parameter name as the template variable name.

e.g.

{
    key1: '{{ key1 }}',     # default variable template
    key2: '{{ template2 }}', # `template2` specifically provide as template
}

prepare_for_flow_run

prepare_for_flow_run(self, flow_run: 'FlowRun', deployment: 'DeploymentResponse | None' = None, flow: 'APIFlow | None' = None, work_pool: 'WorkPool | None' = None, worker_name: str | None = None) -> None

Prepare the job configuration for a flow run.

This method is called by the worker before starting a flow run. It should be used to set any configuration values that are dependent on the flow run.

Args:

  • flow_run: The flow run to be executed.
  • deployment: The deployment that the flow run is associated with.
  • flow: The flow that the flow run is associated with.
  • work_pool: The work pool that the flow run is running in.
  • worker_name: The name of the worker that is submitting the flow run.

BaseVariables

Methods:

model_json_schema

model_json_schema(cls, by_alias: bool = True, ref_template: str = '#/definitions/{model}', schema_generator: Type[GenerateJsonSchema] = GenerateJsonSchema, mode: Literal['validation', 'serialization'] = 'validation') -> dict[str, Any]

TODO: stop overriding this method - use GenerateSchema in ConfigDict instead?

BaseWorkerResult

BaseWorker

Methods:

client

client(self) -> PrefectClient

work_pool

work_pool(self) -> WorkPool

limiter

limiter(self) -> anyio.CapacityLimiter

get_documentation_url

get_documentation_url(cls) -> str

get_logo_url

get_logo_url(cls) -> str

get_description

get_description(cls) -> str

get_default_base_job_template

get_default_base_job_template(cls) -> dict[str, Any]

get_worker_class_from_type

get_worker_class_from_type(type: str) -> Optional[Type['BaseWorker[Any, Any, Any]']]

Returns the worker class for a given worker type. If the worker type is not recognized, returns None.

get_all_available_worker_types

get_all_available_worker_types() -> list[str]

Returns all worker types available in the local registry.

get_name_slug

get_name_slug(self) -> str

get_flow_run_logger

get_flow_run_logger(self, flow_run: 'FlowRun') -> PrefectLogAdapter

is_worker_still_polling

is_worker_still_polling(self, query_interval_seconds: float) -> bool

This method is invoked by a webserver healthcheck handler and returns a boolean indicating if the worker has recorded a scheduled flow run poll within a variable amount of time.

The query_interval_seconds is the same value that is used by the loop services - we will evaluate if the _last_polled_time was within that interval x 30 (so 10s -> 5m)

The instance property self._last_polled_time is currently set/updated in get_and_submit_flow_runs()

get_status

get_status(self) -> dict[str, Any]

Retrieves the status of the current worker including its name, current worker pool, the work pool queues it is polling, and its local settings.