Shubham
10/14/2025, 4:07 PMMarvin
10/14/2025, 4:07 PMMarvin
10/14/2025, 4:10 PMfrom prefect import flow, task
@task(retries=3, retry_delay_seconds=5)
def flaky_task():
# raise an exception to trigger a retry
...
- Exponential backoff with optional jitter:
from prefect import flow, task
from prefect.tasks import exponential_backoff
@task(
retries=5,
retry_delay_seconds=exponential_backoff(backoff_factor=2), # 1, 2, 4, 8, 16 (seconds)
retry_jitter_factor=0.2, # add up to ±20% jitter to each delay
)
def network_call():
...
- Flow-level retries (retries the entire flow run if it fails):
from prefect import flow
@flow(retries=2, retry_delay_seconds=10)
def my_flow():
...
- Conditional retries (only retry for certain exceptions):
from prefect import task
def retry_on_timeouts(exc):
# return True to retry, False to fail immediately
return isinstance(exc, TimeoutError)
@task(retries=3, retry_delay_seconds=5, retry_condition_fn=retry_on_timeouts)
def sometimes_times_out():
...
A couple notes:
- Retries happen within the same flow run.
- If you want to “resume” a failed run by only re-executing failed work, consider enabling result persistence and caching so previously successful tasks can be skipped when you re-run the flow.
Docs for more detail:
- Task API (retries, backoff)
- Prefect blog: scaling and resilience patterns
Quick clarification: are you looking for automatic retries during the same run (as above), or a way to re-run only the failed task after the run has already finished?Shubham
10/14/2025, 4:21 PMMarvin
10/14/2025, 4:21 PMMarvin
10/14/2025, 4:25 PMfrom datetime import timedelta
from prefect import flow, task
from prefect.tasks import task_input_hash
@task(
persist_result=True, # write results to storage
cache_key_fn=task_input_hash, # reuse results when inputs match
cache_expiration=timedelta(days=7), # keep results available across days
)
def heavy_step(x):
...
@flow
def my_flow(x):
a = heavy_step(x)
b = heavy_step(x + 1) # example of another cached step
...
2) Configure where results are stored (so they’re available across runs). For example, use a remote bucket:
from prefect import flow
from prefect.filesystems import RemoteFileSystem
storage = RemoteFileSystem(basepath="<s3://my-bucket/prefect-results>")
@flow(result_storage=storage)
def my_flow(x):
...
3) Re-run the flow with the same parameters the next day (UI or CLI). Successful tasks will be skipped due to cached results; the failed task(s) will execute.
- CLI example (verified):
prefect deployment run my-flow/prod -p x=1 --watch
Notes
- Retries (retries=...) are for automatic re-attempts within the same flow run. What you’re asking for is a new run that only re-executes failed work; caching + persisted results is the pattern for that.
- If you map over many items, input-hash caching ensures only the items that failed (or are new) will re-run on the next execution.
- Choose a cache_expiration that exceeds your typical “resume” window.
Docs
- Caching and persisted results
- Task API (persist_result, cache_key_fn, cache_expiration)
Want me to help apply this to your specific flow? If you can share how the failing task is called and where you’d like to store results (local, S3, GCS, etc.), I can sketch the exact code.Shubham
10/14/2025, 6:04 PMMarvin
10/14/2025, 6:04 PMMarvin
10/14/2025, 6:06 PMfrom pydantic import BaseModel
from prefect import flow
from prefect.flow_runs import suspend_flow_run
class Approval(BaseModel):
approved: bool
note: str | None = None
@flow
def my_flow():
# ... do some work
approval = suspend_flow_run(
wait_for_input=Approval, # UI/API will require these fields
key="gate-1", # useful if you have multiple gates
timeout=24*3600 # optional: auto-timeout after 24h
)
if not approval or not approval.approved:
raise RuntimeError("Not approved")
# continue workflow...
Supplying the input (to resume)
- In the UI: open the flow run (it will show “Awaiting input”), fill the form, click Resume.
- Programmatically:
from prefect import resume_flow_run
resume_flow_run(
flow_run_id="00000000-0000-0000-0000-000000000000",
run_input={"approved": True, "note": "LGTM"}
)
Scaling to hundreds of waiting flows
- This is a supported use case. Paused/Suspended runs are persisted by the orchestrator; workers are not tied up while they wait.
- When resumed, the scheduler queues the run for workers again.
- You can add a timeout (as above) and/or use Automations to act on long-waiting runs (e.g., auto-cancel, notify, or auto-resume on an external event).
Related APIs and docs
- REST API: Resume a flow run — resume-flow-run
- Python helpers for pausing/suspending and inputs (module index): prefect.input and prefect.flow_runs
If you share your exact approval UX (UI-only vs programmatic, expected fields, timeouts), I can tailor the code and an automation to match.