Experimental FeatureThe plugin system is an experimental feature under active development. The API is subject to change without notice, and features may be modified or removed in future releases.
The Prefect plugin system allows third-party packages to run hooks when Prefect is imported. This enables plugins to configure environment variables, authenticate with external services, or perform other initialization tasks automatically - whether you’re running CLI commands, Python scripts, workers, or agents.
Use Cases
The plugin system is designed for scenarios where you need to:
- Obtain short-lived credentials: Automatically fetch temporary AWS credentials, service tokens, or API keys before workflow execution
- Configure environment variables: Set up environment-specific configuration based on the execution context
- Initialize external services: Connect to secret managers, credential stores, or authentication providers
- Prepare execution environments: Ensure required resources or configurations are available before workflows run
Quick Start
Enabling the Plugin System
The plugin system is opt-in and disabled by default. Enable it by setting the environment variable:
export PREFECT_EXPERIMENTS_PLUGINS_ENABLED=1
Once enabled, plugins will automatically run whenever Prefect is imported - this includes:
- Python scripts that import Prefect (
import prefect
)
- CLI commands (
prefect deploy
, prefect server start
, etc.)
- Workers and agents starting up
- Any process that uses Prefect
This ensures your environment is properly configured before any Prefect code executes.
Example Plugin
Here’s a minimal example plugin that sets environment variables:
# my_plugin/__init__.py
from prefect._experimental.plugins import register_hook, HookContext, SetupResult
PREFECT_PLUGIN_API_REQUIRES = ">=0.1,<1"
@register_hook
def setup_environment(*, ctx: HookContext) -> SetupResult:
"""Configure environment before Prefect starts."""
logger = ctx.logger_factory("my-plugin")
# Perform authentication or configuration
credentials = fetch_credentials()
logger.info("Configured credentials")
return SetupResult(
env={
"MY_SERVICE_TOKEN": credentials.token,
"MY_SERVICE_URL": "https://api.example.com",
},
note="Configured my-service credentials",
required=True # Abort if this plugin fails in strict mode
)
def fetch_credentials():
# Your authentication logic here
pass
Register the plugin in your pyproject.toml
:
[project]
name = "my-plugin"
version = "0.1.0"
dependencies = ["prefect>=3.4"]
[project.entry-points."prefect.plugins"]
my_plugin = "my_plugin"
Install the plugin:
Configuration
Environment Variables
Configure the plugin system with these environment variables:
Variable | Description | Default |
---|
PREFECT_EXPERIMENTS_PLUGINS_ENABLED | Enable/disable the plugin system | 0 (disabled) |
PREFECT_EXPERIMENTS_PLUGINS_ALLOW | Comma-separated list of allowed plugin names | None (all allowed) |
PREFECT_EXPERIMENTS_PLUGINS_DENY | Comma-separated list of denied plugin names | None (none denied) |
PREFECT_EXPERIMENTS_PLUGINS_SETUP_TIMEOUT_SECONDS | Maximum time for all plugins to complete | 20 |
PREFECT_EXPERIMENTS_PLUGINS_STRICT | Exit if a required plugin fails | 0 (disabled) |
PREFECT_EXPERIMENTS_PLUGINS_SAFE_MODE | Load plugins without executing hooks | 0 (disabled) |
Examples
Allow only specific plugins:
export PREFECT_EXPERIMENTS_PLUGINS_ALLOW="aws-plugin,gcp-plugin"
Deny problematic plugins:
export PREFECT_EXPERIMENTS_PLUGINS_DENY="legacy-plugin"
Enable strict mode for production:
export PREFECT_EXPERIMENTS_PLUGINS_STRICT=1
Debug plugins without execution:
export PREFECT_EXPERIMENTS_PLUGINS_SAFE_MODE=1
Plugin API
Hook Specification
Plugins implement the setup_environment
hook to configure the environment or run code on startup. The simplest approach is to use a decorated function:
from prefect._experimental.plugins import register_hook, HookContext, SetupResult
PREFECT_PLUGIN_API_REQUIRES = ">=0.1,<1"
@register_hook
def setup_environment(*, ctx: HookContext) -> SetupResult | None:
"""
Prepare process environment for Prefect.
Args:
ctx: Context with Prefect version, API URL, and logger factory
Returns:
SetupResult with environment variables, or None for no changes
"""
logger = ctx.logger_factory("my-plugin")
logger.info(f"Running with Prefect {ctx.prefect_version}")
# Return None if no setup needed
if not should_configure():
return None
# Return SetupResult with configuration
return SetupResult(
env={"KEY": "value"},
note="Short description",
required=False # Optional
)
Required Decorator: The @register_hook
decorator is required to mark your function as a plugin hook implementation. Without it, your plugin will not be discovered by the plugin system.Entry Point: When using the function-based pattern, your entry point should reference the module (e.g., "my_plugin"
) rather than a specific class or instance.
Alternative: Class-Based Plugins
For plugins that need to maintain state, you can also use a class-based approach:
from prefect._experimental.plugins import register_hook, HookContext, SetupResult
class MyPlugin:
def __init__(self):
self.state = {}
@register_hook
def setup_environment(self, *, ctx: HookContext) -> SetupResult | None:
# Access instance state
return SetupResult(env={"KEY": "value"})
# Create instance at module level
Plugin = MyPlugin()
Entry point: "my_plugin:Plugin"
HookContext
The context object passed to plugins:
from dataclasses import dataclass
from typing import Callable
import logging
@dataclass
class HookContext:
prefect_version: str # e.g., "3.0.0"
api_url: str | None # Configured Prefect API URL
logger_factory: Callable[[str], logging.Logger] # Create loggers
SetupResult
The result returned by plugins:
from dataclasses import dataclass
from typing import Mapping
from datetime import datetime
@dataclass
class SetupResult:
env: Mapping[str, str] # Environment variables to set
note: str | None = None # Human-readable description
required: bool = False # Abort in strict mode if fails
Example: AWS Credentials Plugin
Here’s a complete example of a plugin that assumes an AWS IAM role and provides temporary credentials:
# prefect_aws_setup/__init__.py
from __future__ import annotations
import os
from datetime import timezone
import botocore.session
from prefect._experimental.plugins import register_hook, HookContext, SetupResult
PREFECT_PLUGIN_API_REQUIRES = ">=0.1,<1"
@register_hook
def setup_environment(*, ctx: HookContext) -> SetupResult | None:
"""Assume AWS IAM role and provide temporary credentials."""
logger = ctx.logger_factory("prefect-aws-setup")
role_arn = os.getenv("PREFECT_AWS_SETUP_ROLE_ARN")
if not role_arn:
logger.debug("PREFECT_AWS_SETUP_ROLE_ARN not set, skipping")
return None
profile = os.getenv("PREFECT_AWS_SETUP_PROFILE")
region = os.getenv("AWS_REGION", "us-east-1")
duration = int(os.getenv("PREFECT_AWS_SETUP_DURATION", "3600"))
try:
# Create AWS session and assume role
session = botocore.session.Session(profile=profile)
sts = session.create_client("sts", region_name=region)
response = sts.assume_role(
RoleArn=role_arn,
RoleSessionName="prefect-plugin",
DurationSeconds=duration
)
credentials = response["Credentials"]
return SetupResult(
env={
"AWS_ACCESS_KEY_ID": credentials["AccessKeyId"],
"AWS_SECRET_ACCESS_KEY": credentials["SecretAccessKey"],
"AWS_SESSION_TOKEN": credentials["SessionToken"],
"AWS_REGION": region,
},
note=f"Assumed role {role_arn.split('/')[-1]}",
required=bool(os.getenv("PREFECT_AWS_SETUP_REQUIRED")),
)
except Exception:
logger.exception("Failed to assume AWS role")
if os.getenv("PREFECT_AWS_SETUP_REQUIRED"):
raise
return None
Package configuration:
# pyproject.toml
[project]
name = "prefect-aws-setup"
version = "0.1.0"
dependencies = ["prefect>=3.4", "botocore>=1.34"]
[project.entry-points."prefect.plugins"]
aws_setup = "prefect_aws_setup"
Installation:
Usage:
# Configure the plugin
export PREFECT_AWS_SETUP_ROLE_ARN="arn:aws:iam::123456789012:role/PrefectRole"
export PREFECT_AWS_SETUP_PROFILE="my-aws-profile"
export AWS_REGION="us-west-2"
export PREFECT_AWS_SETUP_REQUIRED=1
# Enable plugins
export PREFECT_EXPERIMENTS_PLUGINS_ENABLED=1
# Run Prefect commands - credentials are automatically configured
prefect deploy --all
Diagnostics
Use the diagnostic command to troubleshoot plugin issues:
prefect experimental plugins diagnose
Example output:
Prefect Experimental Plugin System Diagnostics
Enabled: True
Timeout: 20.0s
Strict mode: False
Safe mode: False
Allow list: None
Deny list: None
Discoverable Plugins (entry point group: prefect.plugins)
• aws-setup: active
Module: prefect_aws_setup:Plugin
API requirement: >=0.1,<1
Running Startup Hooks
• aws-setup: success
Environment variables: 4
AWS_ACCESS_KEY_ID=••••••
AWS_SECRET_ACCESS_KEY=••••••
AWS_SESSION_TOKEN=••••••
AWS_REGION=us-west-2
Note: Assumed role PrefectRole
Expires: 2024-01-15 18:30:00+00:00
Security Considerations
Security Best Practices
- Only install plugins from trusted sources
- Review plugin source code before installation
- Use
PREFECT_EXPERIMENTS_PLUGINS_DENY
to quarantine known-bad plugins
- Sensitive values are automatically redacted in logs and diagnostics
- Plugins run with the same permissions as Prefect
Secret Redaction
The plugin system automatically redacts sensitive environment variables in logs and diagnostics. Variables containing these keywords are redacted:
SECRET
TOKEN
PASSWORD
KEY
Example:
# This environment variable will be redacted
{"AWS_SECRET_ACCESS_KEY": "••••••"}
# This will not
{"AWS_REGION": "us-east-1"}
Permissions
Plugins execute with the same permissions as the Prefect process. Be cautious about:
- Environment variables can affect all child processes
- File system access matches the Prefect process user
- Network calls are unrestricted
- Plugins can import any installed package
Behavior & Guarantees
Import-Time Execution
Plugins run automatically when Prefect is imported, before any other Prefect code executes. This means:
- Environment variables are available to all Prefect operations
- Credentials are configured before connecting to APIs
- Setup happens once per process, not per CLI command
If plugin initialization fails (and strict mode is disabled), Prefect will log the error and continue loading.
Execution Order
- Plugins are discovered via entry points in the
prefect.plugins
group
- Execution order is not guaranteed
- Multiple plugins may modify the same environment variables (last write wins)
Error Handling
- Plugin errors are isolated and logged
- One plugin’s failure doesn’t affect others
- In strict mode with
required=True
, plugin failures abort startup
- Timeouts apply globally to all plugins combined
Async Support
Plugins can be either synchronous or asynchronous:
from prefect._experimental.plugins import register_hook, HookContext, SetupResult
# Synchronous plugin
@register_hook
def setup_environment(*, ctx: HookContext) -> SetupResult:
# Synchronous implementation
return SetupResult(env={"KEY": "value"})
# Asynchronous plugin
@register_hook
async def setup_environment(*, ctx: HookContext) -> SetupResult:
# Asynchronous implementation
data = await fetch_data()
return SetupResult(env={"KEY": data})
Idempotence
Plugins should be idempotent - multiple invocations should produce the same result. Prefect may call plugins multiple times during initialization or in different processes.
Version Compatibility
Specifying Version Requirements
Plugins should declare their API version compatibility using the PREFECT_PLUGIN_API_REQUIRES
attribute:
from prefect._experimental.plugins import register_hook, HookContext
# At module level in your plugin
PREFECT_PLUGIN_API_REQUIRES = ">=0.1,<1" # Supports API version 0.1.x
class Plugin:
@register_hook
def setup_environment(self, *, ctx: HookContext):
# Your plugin implementation
pass
The version specifier follows PEP 440 syntax:
">=0.1,<1"
- Compatible with 0.1.x releases, incompatible with 1.0+
">=0.1"
- Compatible with 0.1 and all future versions
"==0.1"
- Only compatible with exactly version 0.1
The current plugin API version is 0.1
. This will be incremented when breaking changes are made to the plugin interface.
Version Validation
When Prefect loads plugins, it automatically validates version compatibility:
- Compatible versions: Plugin loads normally and executes
- Incompatible versions: Plugin is skipped with a warning log message
- Invalid specifiers: Warning is logged, but plugin loads anyway (best-effort)
- Missing attribute: Defaults to
">=0.1,<1"
if not specified
Example logs:
# Incompatible version
WARNING: Skipping plugin my-plugin: requires API version >=1.0, current version is 0.1
# Invalid specifier
DEBUG: Plugin my-plugin has invalid version specifier 'not-valid', ignoring version check
Checking Compatibility
Use the diagnostics command to see plugin version requirements:
prefect experimental plugins diagnose
Output shows API requirements for each plugin:
• my-plugin: active
Module: my_plugin:Plugin
API requirement: >=0.1,<1
Troubleshooting
Plugin Not Running
-
Verify plugins are enabled:
echo $PREFECT_EXPERIMENTS_PLUGINS_ENABLED
-
Check plugin is discoverable:
prefect experimental plugins diagnose
-
Verify entry point is registered:
pip show your-plugin-package
Plugin Errors
-
Enable safe mode to test loading without execution:
PREFECT_EXPERIMENTS_PLUGINS_SAFE_MODE=1 prefect experimental plugins diagnose
-
Check plugin logs:
PREFECT_LOGGING_LEVEL=DEBUG prefect --version
-
Test plugin in isolation:
PREFECT_EXPERIMENTS_PLUGINS_ALLOW="your-plugin" prefect experimental plugins diagnose
Version Compatibility Issues
If a plugin isn’t loading due to version mismatch:
-
Check the plugin’s API requirement:
prefect experimental plugins diagnose
Look for warnings about incompatible API versions
-
Update the plugin to a version compatible with your Prefect installation, or update Prefect to match the plugin’s requirements
-
If you’re developing a plugin, adjust
PREFECT_PLUGIN_API_REQUIRES
to match the current API version:
PREFECT_PLUGIN_API_REQUIRES = ">=0.1,<1" # Current API version is 0.1
-
Enable debug logging to see detailed version validation:
PREFECT_LOGGING_LEVEL=DEBUG prefect --version
Timeout Issues
Increase the timeout for slow operations:
export PREFECT_EXPERIMENTS_PLUGINS_SETUP_TIMEOUT_SECONDS=60
Or optimize plugin startup time by deferring expensive operations.