Syméon del Marmol
08/29/2025, 1:30 PMMarvin
08/29/2025, 1:30 PMMarvin
08/29/2025, 1:33 PMTask.delay()
run inside the parent flow’s job. If that job dies, those background task runs die too. They are not reassigned independently. They’ll only re-run when the parent flow run is retried.
How to get resilient behavior
- Flow-level retries: If the job crashes, a flow retry will enqueue a new attempt in your work pool and any available worker can pick it up.
- Task-level retries: Handle transient task errors while the job is still alive. If the entire job dies, task retries inside that job won’t help; rely on flow retries.
- Persist results + caching: To avoid redoing completed work after a retry, set persist_result=True
and a cache_key_fn
with a cache_expiration
. On a flow retry, completed tasks can be skipped via cache rather than re-executed.
Example pattern
from datetime import timedelta
from prefect import flow, task
@task(
retries=3,
retry_delay_seconds=10,
persist_result=True,
cache_key_fn=lambda *a, **k: f"my-task-{a}-{k}",
cache_expiration=timedelta(days=1),
)
def do_work(x):
...
@flow(retries=2, retry_delay_seconds=60)
async def my_flow(items):
# background tasks with delay()
handles = [do_work.with_options(name=f"work-{i}").delay(i) for i in items]
# await results concurrently
results = await asyncio.gather(*(h.result_async() for h in handles))
return results
Infrastructure nuances
- Process/local: if the Python process dies, flow and background tasks die; rely on flow retries.
- Docker/Kubernetes workers: the default job templates use “no restart” policies; containers/pods won’t restart automatically. Use Prefect retries for rescheduling. If you force infra-level restarts, you’ll essentially re-run the flow process from scratch—still no mid-graph resume.
Observability and automations
- Prefect detects “zombie” (non-heartbeating) runs and marks them Crashed: Detect zombie flows
- Configure retries: Retries guide
- Persisted results: Results persistence
- Caching: Task caching
To tailor guidance:
- Which worker/infrastructure are you using (process, Docker, Kubernetes)?
- For your Task.delay()
usage, do you want to cache results between flow retries to skip recomputation? If so, I can help you wire in persist_result
and a stable cache_key_fn
aligned with your inputs.