The simplest way to cache the results of tasks within in a flow is to set persist_result=True on a task definition.
Copy
from prefect import task, flow@task(persist_result=True)def add_one(x: int): return x + 1@flowdef my_flow(): add_one(1) # will not be cached add_one(1) # will be cached add_one(2) # will not be cachedif __name__ == "__main__": my_flow()
Output
Copy
16:28:51.230 | INFO | Flow run 'outgoing-cicada' - Beginning flow run 'outgoing-cicada' for flow 'my-flow'16:28:51.257 | INFO | Task run 'add_one-d85' - Finished in state Completed()16:28:51.276 | INFO | Task run 'add_one-eff' - Finished in state Cached(type=COMPLETED)16:28:51.294 | INFO | Task run 'add_one-d16' - Finished in state Completed()16:28:51.311 | INFO | Flow run 'outgoing-cicada' - Finished in state Completed()
This will implicitly use the DEFAULT cache policy, which is a composite cache policy defined as:
Copy
DEFAULT = INPUTS + TASK_SOURCE + RUN_ID
This means subsequent calls of a task with identical inputs from within the same parent run will return cached results without executing the body of the function.
The TASK_SOURCE component of the DEFAULT cache policy helps avoid naming collisions between similar tasks that should not share a cache.
To cache the result of a task based only on task inputs, set cache_policy=INPUTS in the task decorator:
Copy
from prefect import taskfrom prefect.cache_policies import INPUTSimport time@task(cache_policy=INPUTS)def my_stateful_task(x: int): print('sleeping') time.sleep(10) return x + 1my_stateful_task(x=1) # sleepsmy_stateful_task(x=1) # does not sleepmy_stateful_task(x=2) # sleeps
The above task will sleep the first time it is called with x=1, but will not sleep for any subsequent calls with the same input.Prefect ships with several cache policies that can be used to customize caching behavior.
To refresh the cache for all tasks, use the PREFECT_TASKS_REFRESH_CACHE setting.
Setting PREFECT_TASKS_REFRESH_CACHE=true changes the default behavior of all tasks to refresh.If you have tasks that should not refresh when this setting is enabled, set refresh_cache to False. These tasks will never write to the cache. If a cache key exists it will be read, not updated.
If a cache key does not exist yet, these tasks can still write to the cache.
By default Prefect stores results locally in ~/.prefect/storage/. To share the cache across tasks running on different machines, provide a storage block to the result_storage parameter on the task decorator.Here’s an example with of a task that uses an S3 bucket to store cache records:
Copy
from prefect import taskfrom prefect.cache_policies import INPUTSfrom prefect_aws import AwsCredentials, S3Buckets3_bucket = S3Bucket( credentials=AwsCredentials( aws_access_key_id="my-access-key-id", aws_secret_access_key="my-secret-access-key", ), bucket_name="my-bucket",)# save the block to ensure it is available across machiness3_bucket.save("my-cache-bucket")@task(cache_policy=INPUTS, result_storage=s3_bucket)def my_cached_task(x: int): return x + 42
When using a storage block from a Prefect integration package, the package the storage block is imported from must be installed in all environments where the task will run.For example, the prefect_aws package must be installed to use the S3Bucket storage block.