prefect.cache_policies

Classes

CachePolicy

Base class for all cache policies. Methods:

compute_key

compute_key(self, task_ctx: TaskRunContext, inputs: dict[str, Any], flow_parameters: dict[str, Any], **kwargs: Any) -> Optional[str]

configure

configure(self, key_storage: Union['WritableFileSystem', str, Path, None] = None, lock_manager: Optional['LockManager'] = None, isolation_level: Union[Literal['READ_COMMITTED', 'SERIALIZABLE'], 'IsolationLevel', None] = None) -> Self
Configure the cache policy with the given key storage, lock manager, and isolation level. Args:
  • key_storage: The storage to use for cache keys. If not provided, the current key storage will be used.
  • lock_manager: The lock manager to use for the cache policy. If not provided, the current lock manager will be used.
  • isolation_level: The isolation level to use for the cache policy. If not provided, the current isolation level will be used.
Returns:
  • A new cache policy with the given key storage, lock manager, and isolation level.

from_cache_key_fn

from_cache_key_fn(cls, cache_key_fn: Callable[['TaskRunContext', Dict[str, Any]], Optional[str]]) -> 'CacheKeyFnPolicy'
Given a function generates a key policy.

CacheKeyFnPolicy

This policy accepts a custom function with signature f(task_run_context, task_parameters, flow_parameters) -> str and uses it to compute a task run cache key. Methods:

compute_key

compute_key(self, task_ctx: TaskRunContext, inputs: dict[str, Any], flow_parameters: dict[str, Any], **kwargs: Any) -> Optional[str]

compute_key

compute_key(self, task_ctx: TaskRunContext, inputs: dict[str, Any], flow_parameters: dict[str, Any], **kwargs: Any) -> Optional[str]

configure

configure(self, key_storage: Union['WritableFileSystem', str, Path, None] = None, lock_manager: Optional['LockManager'] = None, isolation_level: Union[Literal['READ_COMMITTED', 'SERIALIZABLE'], 'IsolationLevel', None] = None) -> Self
Configure the cache policy with the given key storage, lock manager, and isolation level. Args:
  • key_storage: The storage to use for cache keys. If not provided, the current key storage will be used.
  • lock_manager: The lock manager to use for the cache policy. If not provided, the current lock manager will be used.
  • isolation_level: The isolation level to use for the cache policy. If not provided, the current isolation level will be used.
Returns:
  • A new cache policy with the given key storage, lock manager, and isolation level.

from_cache_key_fn

from_cache_key_fn(cls, cache_key_fn: Callable[['TaskRunContext', Dict[str, Any]], Optional[str]]) -> 'CacheKeyFnPolicy'
Given a function generates a key policy.

CompoundCachePolicy

This policy is constructed from two or more other cache policies and works by computing the keys for each policy individually, and then hashing a sorted tuple of all computed keys. Any keys that return None will be ignored. Methods:

compute_key

compute_key(self, task_ctx: TaskRunContext, inputs: dict[str, Any], flow_parameters: dict[str, Any], **kwargs: Any) -> Optional[str]

compute_key

compute_key(self, task_ctx: TaskRunContext, inputs: dict[str, Any], flow_parameters: dict[str, Any], **kwargs: Any) -> Optional[str]

configure

configure(self, key_storage: Union['WritableFileSystem', str, Path, None] = None, lock_manager: Optional['LockManager'] = None, isolation_level: Union[Literal['READ_COMMITTED', 'SERIALIZABLE'], 'IsolationLevel', None] = None) -> Self
Configure the cache policy with the given key storage, lock manager, and isolation level. Args:
  • key_storage: The storage to use for cache keys. If not provided, the current key storage will be used.
  • lock_manager: The lock manager to use for the cache policy. If not provided, the current lock manager will be used.
  • isolation_level: The isolation level to use for the cache policy. If not provided, the current isolation level will be used.
Returns:
  • A new cache policy with the given key storage, lock manager, and isolation level.

from_cache_key_fn

from_cache_key_fn(cls, cache_key_fn: Callable[['TaskRunContext', Dict[str, Any]], Optional[str]]) -> 'CacheKeyFnPolicy'
Given a function generates a key policy.

TaskSource

Policy for computing a cache key based on the source code of the task. This policy only considers raw lines of code in the task, and not the source code of nested tasks. Methods:

compute_key

compute_key(self, task_ctx: TaskRunContext, inputs: Optional[dict[str, Any]], flow_parameters: Optional[dict[str, Any]], **kwargs: Any) -> Optional[str]

compute_key

compute_key(self, task_ctx: TaskRunContext, inputs: dict[str, Any], flow_parameters: dict[str, Any], **kwargs: Any) -> Optional[str]

configure

configure(self, key_storage: Union['WritableFileSystem', str, Path, None] = None, lock_manager: Optional['LockManager'] = None, isolation_level: Union[Literal['READ_COMMITTED', 'SERIALIZABLE'], 'IsolationLevel', None] = None) -> Self
Configure the cache policy with the given key storage, lock manager, and isolation level. Args:
  • key_storage: The storage to use for cache keys. If not provided, the current key storage will be used.
  • lock_manager: The lock manager to use for the cache policy. If not provided, the current lock manager will be used.
  • isolation_level: The isolation level to use for the cache policy. If not provided, the current isolation level will be used.
Returns:
  • A new cache policy with the given key storage, lock manager, and isolation level.

from_cache_key_fn

from_cache_key_fn(cls, cache_key_fn: Callable[['TaskRunContext', Dict[str, Any]], Optional[str]]) -> 'CacheKeyFnPolicy'
Given a function generates a key policy.

FlowParameters

Policy that computes the cache key based on a hash of the flow parameters. Methods:

compute_key

compute_key(self, task_ctx: TaskRunContext, inputs: dict[str, Any], flow_parameters: dict[str, Any], **kwargs: Any) -> Optional[str]

compute_key

compute_key(self, task_ctx: TaskRunContext, inputs: dict[str, Any], flow_parameters: dict[str, Any], **kwargs: Any) -> Optional[str]

configure

configure(self, key_storage: Union['WritableFileSystem', str, Path, None] = None, lock_manager: Optional['LockManager'] = None, isolation_level: Union[Literal['READ_COMMITTED', 'SERIALIZABLE'], 'IsolationLevel', None] = None) -> Self
Configure the cache policy with the given key storage, lock manager, and isolation level. Args:
  • key_storage: The storage to use for cache keys. If not provided, the current key storage will be used.
  • lock_manager: The lock manager to use for the cache policy. If not provided, the current lock manager will be used.
  • isolation_level: The isolation level to use for the cache policy. If not provided, the current isolation level will be used.
Returns:
  • A new cache policy with the given key storage, lock manager, and isolation level.

from_cache_key_fn

from_cache_key_fn(cls, cache_key_fn: Callable[['TaskRunContext', Dict[str, Any]], Optional[str]]) -> 'CacheKeyFnPolicy'
Given a function generates a key policy.

RunId

Returns either the prevailing flow run ID, or if not found, the prevailing task run ID. Methods:

compute_key

compute_key(self, task_ctx: TaskRunContext, inputs: dict[str, Any], flow_parameters: dict[str, Any], **kwargs: Any) -> Optional[str]

compute_key

compute_key(self, task_ctx: TaskRunContext, inputs: dict[str, Any], flow_parameters: dict[str, Any], **kwargs: Any) -> Optional[str]

configure

configure(self, key_storage: Union['WritableFileSystem', str, Path, None] = None, lock_manager: Optional['LockManager'] = None, isolation_level: Union[Literal['READ_COMMITTED', 'SERIALIZABLE'], 'IsolationLevel', None] = None) -> Self
Configure the cache policy with the given key storage, lock manager, and isolation level. Args:
  • key_storage: The storage to use for cache keys. If not provided, the current key storage will be used.
  • lock_manager: The lock manager to use for the cache policy. If not provided, the current lock manager will be used.
  • isolation_level: The isolation level to use for the cache policy. If not provided, the current isolation level will be used.
Returns:
  • A new cache policy with the given key storage, lock manager, and isolation level.

from_cache_key_fn

from_cache_key_fn(cls, cache_key_fn: Callable[['TaskRunContext', Dict[str, Any]], Optional[str]]) -> 'CacheKeyFnPolicy'
Given a function generates a key policy.

Inputs

Policy that computes a cache key based on a hash of the runtime inputs provided to the task.. Methods:

compute_key

compute_key(self, task_ctx: TaskRunContext, inputs: dict[str, Any], flow_parameters: dict[str, Any], **kwargs: Any) -> Optional[str]

compute_key

compute_key(self, task_ctx: TaskRunContext, inputs: dict[str, Any], flow_parameters: dict[str, Any], **kwargs: Any) -> Optional[str]

configure

configure(self, key_storage: Union['WritableFileSystem', str, Path, None] = None, lock_manager: Optional['LockManager'] = None, isolation_level: Union[Literal['READ_COMMITTED', 'SERIALIZABLE'], 'IsolationLevel', None] = None) -> Self
Configure the cache policy with the given key storage, lock manager, and isolation level. Args:
  • key_storage: The storage to use for cache keys. If not provided, the current key storage will be used.
  • lock_manager: The lock manager to use for the cache policy. If not provided, the current lock manager will be used.
  • isolation_level: The isolation level to use for the cache policy. If not provided, the current isolation level will be used.
Returns:
  • A new cache policy with the given key storage, lock manager, and isolation level.

from_cache_key_fn

from_cache_key_fn(cls, cache_key_fn: Callable[['TaskRunContext', Dict[str, Any]], Optional[str]]) -> 'CacheKeyFnPolicy'
Given a function generates a key policy.