<@ULVA73B9P> How does prefect deal with worker res...
# ask-marvin
s
@Marvin How does prefect deal with worker restarting for any reason, while running a background task or a flow ? Will that task be reassigned to another running worker ? or rescheduled ?
m
thought for 183.4 seconds
Great question. Short answer: Prefect won’t “move” an in-flight run to another worker mid-execution. What happens depends on whether the infrastructure job is still alive. Key behaviors in Prefect 3.x (you’re on 3.4.14): - Workers just launch infrastructure jobs. Once a flow run has started, it’s the job (process/container/pod) that executes the run, not the worker. If a worker restarts, any already-launched jobs keep running unaffected. - If the job dies (e.g., process crash, container/pod exit), the flow stops heartbeating and the server marks the flow run Crashed after the zombie window. It is not automatically continued on another worker. To have it run again, you need retries configured. - Background tasks created via
Task.delay()
run inside the parent flow’s job. If that job dies, those background task runs die too. They are not reassigned independently. They’ll only re-run when the parent flow run is retried. How to get resilient behavior - Flow-level retries: If the job crashes, a flow retry will enqueue a new attempt in your work pool and any available worker can pick it up. - Task-level retries: Handle transient task errors while the job is still alive. If the entire job dies, task retries inside that job won’t help; rely on flow retries. - Persist results + caching: To avoid redoing completed work after a retry, set
persist_result=True
and a
cache_key_fn
with a
cache_expiration
. On a flow retry, completed tasks can be skipped via cache rather than re-executed. Example pattern
Copy code
from datetime import timedelta
from prefect import flow, task

@task(
    retries=3,
    retry_delay_seconds=10,
    persist_result=True,
    cache_key_fn=lambda *a, **k: f"my-task-{a}-{k}",
    cache_expiration=timedelta(days=1),
)
def do_work(x):
    ...

@flow(retries=2, retry_delay_seconds=60)
async def my_flow(items):
    # background tasks with delay()
    handles = [do_work.with_options(name=f"work-{i}").delay(i) for i in items]
    # await results concurrently
    results = await asyncio.gather(*(h.result_async() for h in handles))
    return results
Infrastructure nuances - Process/local: if the Python process dies, flow and background tasks die; rely on flow retries. - Docker/Kubernetes workers: the default job templates use “no restart” policies; containers/pods won’t restart automatically. Use Prefect retries for rescheduling. If you force infra-level restarts, you’ll essentially re-run the flow process from scratch—still no mid-graph resume. Observability and automations - Prefect detects “zombie” (non-heartbeating) runs and marks them Crashed: Detect zombie flows - Configure retries: Retries guide - Persisted results: Results persistence - Caching: Task caching To tailor guidance: - Which worker/infrastructure are you using (process, Docker, Kubernetes)? - For your
Task.delay()
usage, do you want to cache results between flow retries to skip recomputation? If so, I can help you wire in
persist_result
and a stable
cache_key_fn
aligned with your inputs.