Set the number of retries
To rerun your workflow when an exception is raised, set the retries
parameter to an integer.
from prefect import flow
@flow(retries=3)
def troublesome_workflow():
raise Exception("oopsie daisy")
Wait between retries
To add a wait between retries, pass a value to the retry_delay_seconds
parameter.
from prefect import flow
@flow(retries=3, retry_delay_seconds=2.7182)
def troublesome_workflow():
raise Exception("oopsie daisy")
Retry parts of a workflow
You can configure retries for individual tasks in a workflow to limit the scope of a retry.
import random
import httpx
from prefect import flow, task
@task
def consistent_task() -> dict:
print("I won't rerun even if the task after me fails")
@task(retries=4, retry_delay_seconds=5)
def mercurial_task() -> dict:
# Will fail half of the time
url = f"https://httpbin.org/status/{random.choice([200, 500])}"
response = httpx.get(url)
response.raise_for_status()
@flow
def my_workflow():
consistent_task()
mercurial_task()
Retry with configurable delay
A task’s retry delays can also be defined as a list of integers for different delays between retries.
from prefect import task
@task(retries=4, retry_delay_seconds=[1, 2, 4, 8])
def melancholic_task():
raise Exception("We used to see each other so much more often")
You can also use the exponential_backoff
utility to generate a list of retry delays that correspond to an exponential backoff retry strategy.
from prefect import task
from prefect.tasks import exponential_backoff
@task(retries=4, retry_delay_seconds=exponential_backoff(backoff_factor=2))
def melancholic_task():
raise Exception("We used to see each other so much more often")
Retry with a custom condition
Whether or not a task should be retried can be determined dynamically by passing a callable to the retry_condition_fn
parameter.
import httpx
from prefect import flow, task
def retry_handler(task, task_run, state) -> bool:
"""
Retry handler that skips retries if the HTTP status code is 401 or 404.
"""
try:
state.result()
except httpx.HTTPStatusError as exc:
do_not_retry_on_these_codes = [401, 404]
return exc.response.status_code not in do_not_retry_on_these_codes
except httpx.ConnectError:
return False
except:
return True
@task(retries=1, retry_condition_fn=retry_handler)
def api_call_task(url):
response = httpx.get(url)
response.raise_for_status()
return response.json()
@flow
def my_workflow(url):
api_call_task(url=url)
If a callable passed to retry_condition_fn
returns True
, the task will be retried. Otherwise, the task will exit with an exception.
Add jitter to retry delays
To add a random amount of time to retry delays, pass a value to the retry_jitter_factor
parameter.
import time
from prefect import task
from prefect.tasks import exponential_backoff
last_run_time = time.time()
@task(
retries=3,
retry_delay_seconds=exponential_backoff(backoff_factor=5),
retry_jitter_factor=3,
)
def some_task_with_exponential_backoff_retries():
global last_run_time
print(f"Time between retries: {time.time() - last_run_time}")
if time.time() - last_run_time < 10:
last_run_time = time.time()
raise Exception("I could fail some more")
return "That's enough failure"
Adding jitter to the retry delays avoids multiple tasks introducing load to external systems by failing and retrying at the exact same cadence.
You can set the default retries and retry delays for all tasks via Prefect’s settings.
The default values can be overridden on a per-task basis via the retries
and retry_delay_seconds
parameters.
prefect config set PREFECT_TASK_DEFAULT_RETRIES=2
prefect config set PREFECT_TASK_DEFAULT_RETRY_DELAY_SECONDS="1,10,100"