The simplest way to cache the results of tasks within in a flow is to set persist_result=True on a task definition.
Copy
from prefect import task, flow@task(persist_result=True)def add_one(x: int): return x + 1@flowdef my_flow(): add_one(1) # will not be cached add_one(1) # will be cached add_one(2) # will not be cachedif __name__ == "__main__": my_flow()
Copy
16:28:51.230 | INFO | Flow run 'outgoing-cicada' - Beginning flow run 'outgoing-cicada' for flow 'my-flow'16:28:51.257 | INFO | Task run 'add_one-d85' - Finished in state Completed()16:28:51.276 | INFO | Task run 'add_one-eff' - Finished in state Cached(type=COMPLETED)16:28:51.294 | INFO | Task run 'add_one-d16' - Finished in state Completed()16:28:51.311 | INFO | Flow run 'outgoing-cicada' - Finished in state Completed()
This will implicitly use the DEFAULT cache policy, which is a composite cache policy defined as:
Copy
DEFAULT = INPUTS + TASK_SOURCE + RUN_ID
This means subsequent calls of a task with identical inputs from within the same parent run will return cached results without executing the body of the function.
The TASK_SOURCE component of the DEFAULT cache policy helps avoid naming collisions between similar tasks that should not share a cache.
To refresh the cache for all tasks, use the PREFECT_TASKS_REFRESH_CACHE setting.
Setting PREFECT_TASKS_REFRESH_CACHE=true changes the default behavior of all tasks to refresh.
If you have tasks that should not refresh when this setting is enabled, set refresh_cache to False. These tasks will never write to the cache. If a cache key exists it will be read, not updated.
If a cache key does not exist yet, these tasks can still write to the cache.
By default Prefect stores results locally in ~/.prefect/storage/. To share the cache across tasks running on different machines, provide a storage block to the result_storage parameter on the task decorator.
Here’s an example with of a task that uses an S3 bucket to store cache records:
Copy
from prefect import taskfrom prefect.cache_policies import INPUTSfrom prefect_aws import AwsCredentials, S3Buckets3_bucket = S3Bucket( credentials=AwsCredentials( aws_access_key_id="my-access-key-id", aws_secret_access_key="my-secret-access-key", ), bucket_name="my-bucket",)# save the block to ensure it is available across machiness3_bucket.save("my-cache-bucket")@task(cache_policy=INPUTS, result_storage=s3_bucket)def my_cached_task(x: int): return x + 42
When using a storage block from a Prefect integration package, the package the storage block is imported from must be installed in all environments where the task will run.
For example, the prefect_aws package must be installed to use the S3Bucket storage block.