Syméon del Marmol
09/09/2025, 12:09 PMMarvin
09/09/2025, 12:09 PMMarvin
09/09/2025, 12:17 PM.delay()
and is awaiting .result_async()
) restarts, those Python futures are gone. As long as you persisted the task run IDs, you can reattach to those runs and fetch results from Prefect.
Make task runs restart-safe
- Add retries and backoff to tasks so crashed runs are rescheduled:
from prefect import task
@task(
retries=5,
retry_delay_seconds=[10, 30, 60, 120, 300], # backoff
retry_jitter_factor=0.2, # avoid thundering herd
)
def do_work(x: int) -> int:
...
- Persist results so you can read them after any process dies:
@task(persist_result=True)
def do_work(x: int) -> int:
...
- Use idempotent task logic or a cache key to de-duplicate in case of duplicate execution after a crash (e.g., check-if-done-before-doing, or a cache policy keyed by your job identifier).
Running task workers on Kubernetes
- Run multiple replicas of a single “task worker” Deployment using the CLI:
prefect task serve ./path/to/tasks.py:do_work --limit 20
This command is verified by prefect task serve --help
. Multiple replicas are safe; they coordinate via the API queue. If one dies, another will pick up the run.
- Add liveness/readiness probes and a PodDisruptionBudget so at least one worker stays up during rolling updates.
- Use an external result store (e.g., S3/GCS) for durability so result_async()
can always retrieve results. You can set a default result storage at the flow/task level or via settings/blocks.
Fan-out pattern with .delay() and dynamic names
- Your .with_options(name=...)
+ .delay()
+ asyncio.gather
pattern is fine. To survive backend restarts, capture and persist the run IDs for every delayed task alongside your own business job IDs.
- On restart, reattach by polling the run state by ID and materializing the result. A pattern you can adopt:
import asyncio
from prefect import get_client
from prefect.states import State, get_state_result
async def wait_for_task_result(task_run_id: str, poll=2.0):
async with get_client() as client:
while True:
tr = await client.read_task_run(task_run_id)
state: State = tr.state
if state.type.value in {"COMPLETED", "FAILED", "CRASHED", "CANCELLED"}:
break
await asyncio.sleep(poll)
# Completed → return result; all other terminal states → raise
if state.type.value == "COMPLETED":
return get_state_result(state)
else:
# Will raise the original exception if one was captured
return get_state_result(state) # this will raise
Notes:
- Ensure persist_result=True
on the task; otherwise results may only exist in the original process memory.
- Persist the task run IDs returned by .delay()
so you can call wait_for_task_result(task_run_id)
after a restart.
- If you prefer push vs pull, you can also have the task send a webhook when it completes, then your backend consumes that event instead of polling.
Triggering flows resilientlyMarvin
09/09/2025, 12:17 PMfrom prefect.deployments import run_deployment
from prefect.flow_runs import wait_for_flow_run
# Trigger a flow deployment
fr = await run_deployment("project/my-flow-deployment", parameters={"x": 1})
# Persist fr.id in your DB
# Later (or across restarts), reattach:
final_state = await wait_for_flow_run(fr.id)
- run_deployment
and wait_for_flow_run
are designed to work across process restarts and were verified to exist in 3.x:
- API reference: deployments
- API reference: flow_runs
Operational guardrails for restarts
- Concurrency limits to smooth restarts: set global or work queue concurrency limits so retries after a crash don’t stampede.
- Use jitter on retries (retry_jitter_factor
) to stagger re-execution.
- Ensure your Kubernetes Deployment for task workers and Prefect Server has proper probes and PDBs.
A few clarifications that will help me tailor this:
- Are you using Prefect Cloud or Prefect Server (self-hosted)? If Server, what’s your backing database?
- Are your background tasks exclusively Prefect tasks (via .delay()
), or do you also fan out to separate flow runs?
- Do you already have external result storage configured (S3/GCS/Azure), or should I share a quick setup?
References
- Built for resilience blog: Design patterns for resilient pipelines
- Idempotency best practices: Importance of idempotent pipelines
- Task CLI help (verified): prefect task serve --help
If you share a short code sample of how you’re creating the tasks and awaiting them, I can annotate it with the minimal changes needed to capture run IDs, persist results, and reattach after restarts.Syméon del Marmol
09/09/2025, 12:27 PMMarvin
09/09/2025, 12:27 PMMarvin
09/09/2025, 12:30 PMfrom prefect.runner import submit_to_runner
from prefect.flow_runs import wait_for_flow_run
from prefect import tags
# Your flow
from my_flows import my_flow
async def launch_job(job_id: str, params: dict) -> str:
# Tag the run so it can be discovered if you ever lose the ID
with tags(f"job:{job_id}", "origin:backend"):
flow_run = await submit_to_runner(my_flow, parameters=params)
# Persist mapping job_id -> flow_run_id to your DB
db.save_mapping(job_id=job_id, flow_run_id=str(flow_run.id))
return str(flow_run.id)
- Reattach after a backend restart
from prefect.flow_runs import wait_for_flow_run
from prefect.states import get_state_result
async def resume_tracking(job_id: str):
flow_run_id = db.get_mapping(job_id)
flow_run = await wait_for_flow_run(flow_run_id) # waits until terminal state
if flow_run.state.is_completed():
result = get_state_result(flow_run.state) # reads from your S3 result storage
return result
else:
# FAILED/CRASHED/CANCELLED → raises underlying exception if present
return get_state_result(flow_run.state)
- If you didn’t persist the ID (reconstruction by tag)
from prefect import get_client
async def find_flow_run_ids_for_job(job_id: str):
async with get_client() as client:
# Query for most recent runs with your correlation tag
runs = await client.read_flow_runs(
# Use the tags filter; you can also filter by time window / state
flow_runs={"tags": {"all_": [f"job:{job_id}"]}, "limit": 50}
)
return [str(fr.id) for fr in runs]
Why this is resilient to pod restarts
- Workers: if a worker pod dies mid-task, Prefect will mark the task run Crashed; with retries configured on tasks, it transitions to AwaitingRetry and is picked up by any healthy worker replica when due.
- Backend: your backend futures go away on a restart, but your DB has job_id -> flow_run_id. You reattach using wait_for_flow_run(flow_run_id)
and materialize the result from S3.
Recommended hardening you already mostly have
- Tasks idempotent: great. Keep retries with backoff+jitter on tasks to auto-recover from crashes.
- Persist results to S3: ensures get_state_result
can read results after any process dies.
- Add tags like job:{job_id}
and origin:backend
so you can query or drive Automations.
- Use multiple replicas of prefect task serve
workers (with readiness/liveness probes and a PodDisruptionBudget) to ride through rolling updates.
Notes
- Calling a flow function directly in-process will not be resilient to a backend restart. Submitting with submit_to_runner
creates a tracked flow run in Prefect and immediately returns the FlowRun object (with ID) so you can persist and reattach.
- If you ever do switch to deployments, run_deployment
behaves similarly (returns a FlowRun quickly) but you don’t need it for reactivity if Runner fits your needs.
Docs for reference
- Flow runs API helpers: prefect.flow_runs
- Runner module: prefect.runner
- Tasks (retries, persistence): prefect.tasks
If you share your current submission snippet, I can adapt it to use submit_to_runner
and add the minimal persistence (and tag) hooks to make restart recovery plug-and-play.Syméon del Marmol
09/09/2025, 12:50 PMMarvin
09/09/2025, 12:50 PMMarvin
09/09/2025, 12:51 PMfrom prefect.flow_runs import wait_for_flow_run
from prefect.states import get_state_result
async def handle_completion(flow_run_id: str):
fr = await wait_for_flow_run(flow_run_id, log_states=True)
if fr.state.type.value == "CRASHED":
# Detached/crash → re-submit with your policy
params = db.get_params_for(flow_run_id)
await resubmit_with_same_params(params)
return
if fr.state.type.value == "FAILED":
# Keep the failure
exc = None
try:
get_state_result(fr.state) # will raise original exception
except Exception as e:
exc = e
record_failure(flow_run_id, exc)
return
# COMPLETED, CANCELLED, etc.
result = get_state_result(fr.state)
record_success(flow_run_id, result)
Option B: Use an Automation on “Crashed” events
- Create an Automation with an Event trigger on flow-run crashed and a tag filter (e.g., origin:backend
).
- Action: Call a webhook in your backend that re-submits the same job parameters. Do nothing on Failed events.
Links:
- API reference: prefect.automations
- Events overview: Events
What about tasks inside flows?
- Tasks can have conditional retries with retry_condition_fn
, which lets you suppress retries on “application errors” you don’t want retried.
- Example: only retry on infrastructure/transient exceptions you whitelist; let business logic errors fail immediately.
from prefect import task
from prefect.exceptions import InfrastructureError
def retry_only_on_transient(exc: BaseException | None) -> bool:
# Return True to retry; False to keep the failure
# Whitelist infrastructure-like/transient errors
return isinstance(exc, (InfrastructureError, TimeoutError, ConnectionError))
@task(
retries=5,
retry_delay_seconds=[5, 10, 20, 40, 60],
retry_jitter_factor=0.2,
retry_condition_fn=retry_only_on_transient,
persist_result=True,
)
def do_work(...):
...
Note: If the process dies, there’s no Python exception for that attempt; the task/flow run will be marked CRASHED by the server. The retry_condition_fn only runs when an exception is captured. In practice:
- Use task-level retry_condition_fn to control retries for in-process exceptions.
- Use state.type branching (CRASHED vs FAILED) at the flow level (or in your supervisor) to decide on re-submission when a pod/runner disappears.
Useful API bits
- State types: prefect.states
- Read a flow run and inspect its state
from prefect import get_client
async def is_crashed(flow_run_id: str) -> bool:
async with get_client() as client:
fr = await client.read_flow_run(flow_run_id)
return fr.state.type.value == "CRASHED"
- You can also inspect fr.state.message
to differentiate “lost heartbeat” style crashes from others if you need finer-grained routing.
Given your setup (self-hosted, flows submitted reactively, S3 result storage, idempotent tasks), the simplest and most robust approach is:
- Persist flow_run_id at submission time.Marvin
09/09/2025, 12:51 PMSyméon del Marmol
09/09/2025, 1:21 PMMarvin
09/09/2025, 1:21 PMMarvin
09/09/2025, 1:21 PMrun_deployment
or submit_to_runner
from inside the flow), then killing the parent’s process will typically mark only the parent CRASHED; the child can continue running because it’s in another process/pod. There’s no automatic cascade unless you add one (e.g., an Automation that cancels children when the parent crashes).
What you can do with this
- If you want to re-run only the parent after an in-process crash, you can key off the parent’s state.type == CRASHED and resubmit just the root flow.
- If you use remote subflows and want cascading behavior, add an Automation on “flow-run crashed” for the parent that:
- finds children by parent_flow_run_id
(or by a shared tag), then
- cancels them or triggers your retry policy.
- Conversely, if you want children to continue independently, keep them remote and avoid cascaded cancellation.
Finding children by parent
- Each subflow run records parent_flow_run_id
and root_flow_run_id
. You can query with the client for children if needed and decide how to handle them after a parent crash.
References
- State types: prefect.states
- Runner helpers: prefect.runner
If you share how you’re invoking the subflow (in-process vs submitting another run), I can give you concrete snippets for detection and the Automation/cancellation pattern.Syméon del Marmol
09/10/2025, 8:31 AMMessage: "Task run failed with exception: RuntimeError('Failed to connect to the `Runner` webserver. Ensure that the server is running and reachable. You can run the webserver either by starting your `serve` process with `webserver=True`, or by setting `PREFECT_RUNNER_SERVER_ENABLE=True`.')"
What should I enable ?
Is the approach you described valid with flows that are not deployed ?Marvin
09/10/2025, 8:31 AMMarvin
09/10/2025, 8:35 AMsubmit_to_runner
needs a Runner webserver to be running and reachable. Right now, there isn’t one (or it’s not reachable from your submitting process), so the submission can’t be accepted.
What to enable
- Start a Runner with its webserver enabled in the process that will execute your flows:
- In Python: pass webserver=True
when you start your serve process
- Or via env: set PREFECT_RUNNER_SERVER_ENABLE=true
in that process/pod
Practical ways to run it
- Python API (useful in your Kubernetes “runner” pod):
from prefect.flows import serve
from my_flows import my_flow # the flow(s) you plan to submit
if __name__ == "__main__":
# Start a Runner, enable its webserver, and serve these flows
serve(my_flow, limit=20, webserver=True)
- CLI (if you prefer):
# In the pod that will execute flows
# Set the env var so the Runner’s webserver is started
export PREFECT_RUNNER_SERVER_ENABLE=true
prefect flow serve ./path/to/file.py:my_flow -n my-flow-deployment
Note: the CLI doesn’t expose a --webserver
flag; use the env var as shown.
Networking in Kubernetes
- By default the webserver binds to localhost. If your submitter runs in a different pod, expose the Runner webserver through a Service and set the Runner server settings so it binds to a routable address (e.g., 0.0.0.0) and your submitter knows where to reach it.
- Set the Runner server settings via env (e.g., PREFECT_RUNNER_SERVER_ENABLE=true
, plus host/port if you need to change bind address). If you want, I can share a minimal K8s Deployment + Service manifest.
Is this valid without deployments?
- Yes. This Runner-based approach is specifically for flows that are not deployed. It remains reactive: submit_to_runner
returns a FlowRun
immediately and the run is executed by the Runner process that you’ve “served.”
Minimal submitter example (your backend)
from prefect.runner import submit_to_runner
from prefect.flow_runs import wait_for_flow_run
from prefect.states import get_state_result
from my_flows import my_flow
# Submit reactively
fr = await submit_to_runner(my_flow, parameters={"x": 1})
# Persist fr.id somewhere durable, then reattach later if needed
final = await wait_for_flow_run(fr.id)
result = get_state_result(final.state) # reads from S3 since you persist results
Docs for reference
- Runner API: prefect.runner
- Flow runs helpers: prefect.flow_runs
- Settings (env): prefect.settings
If you share how your pods are arranged (which pod will host the Runner and which will submit), I can give you exact env vars and a small K8s manifest to make the webserver reachable between them.Syméon del Marmol
09/10/2025, 8:57 AMserve
method does not take a webserver parameter
• the PREFECT_RUNNER_SERVER_ENABLE=true
has no effect
I still get the error when trying to call the flow from a background task, using submit_to_runner
function.
Here is the code of my worker:
@flow
async def long_flow(input: str):
await dummy_task(input)
@task
async def dummy_task(input: str):
print(f"dummy task running: {input}")
await asyncio.sleep(30)
print(f"dummy task done: {input}")
@task
async def dummy_task_serve():
with tags("resilient"):
flow_run: FlowRun = await submit_to_runner(long_flow, parameters={"input": "test"})
await wait_for_flow_run(flow_run.flow_id)
if __name__ == "__main__":
serve(dummy_task_serve)
I call this to trigger the background task from my other backend:
dummy_task_serve.delay()
=> What I'm trying to do is getting the flow or background task rescheduled in case of the worker restarts or crashes.Marvin
09/10/2025, 8:57 AMMarvin
09/10/2025, 8:59 AMfrom prefect import flow, serve
@flow
async def long_flow(input: str):
print(f"running: {input}")
# ... your work ...
if __name__ == "__main__":
# In this process/pod, set:
# PREFECT_RUNNER_SERVER_ENABLE=true
# Optionally also set host/port and expose with a Service if other pods will connect.
serve(long_flow, limit=10)
- In Kubernetes:
- Set env var PREFECT_RUNNER_SERVER_ENABLE=true in this pod.
- If the submitter runs in another pod, bind the webserver to 0.0.0.0 and expose it via a Service. Point your submitter at that URL. (We can share a minimal Deployment + Service if helpful.)
- Note: PREFECT_RUNNER_SERVER_ENABLE must be set in this “flow runner” pod, not just anywhere.
2) Run your task worker separately
- This is the pod that executes your background tasks (.delay()).
- It can call submit_to_runner as long as it can reach the Runner webserver over the network.
Your task code (fixed)
import asyncio
from prefect import task, flow, tags
from prefect.runner import submit_to_runner
from prefect.flow_runs import wait_for_flow_run
from prefect.states import get_state_result
@flow
async def long_flow(input: str):
await dummy_task(input)
@task
async def dummy_task(input: str):
print(f"dummy task running: {input}")
await asyncio.sleep(30)
print(f"dummy task done: {input}")
@task(retries=3, retry_delay_seconds=[10, 30, 60]) # optional resilience on the submitter task itself
async def dummy_task_serve():
with tags("resilient"):
fr = await submit_to_runner(long_flow, parameters={"input": "test"})
# BUGFIX: wait for the flow RUN id, not the flow id
final = await wait_for_flow_run(fr.id)
result = get_state_result(final.state) # reads from S3 if you persist results
return result
Start your task worker with:
prefect task serve ./path/to/file.py:dummy_task_serve --limit 20
Reference: prefect task serve help CLI: prefect task serve
3) Networking between submitter and runner
- If they’re different pods:
- Runner pod: set PREFECT_RUNNER_SERVER_ENABLE=true and bind to a routable address; expose with a Service.
- Submitter pod: point submit_to_runner at the Runner server URL (configure the Runner server URL setting for the client). Make sure both pods share the same Prefect API profile (PREFECT_API_URL/PREFECT_API_KEY) so the flow run is registered in the same backend.
- If they’re the same pod/process, submitting to localhost should “just work” once the Runner webserver is enabled.
4) Your original question: Is this valid for non-deployed flows?
- Yes. This Runner pattern is explicitly for flows that are not deployed. You get reactive submissions via submit_to_runner, plus resiliency because the flow run is tracked by Prefect and results are persisted (S3 in your case).
Common pitfalls to check
- Make sure you are serving the flow(s) with the Runner (not the submitter task).
- Ensure PREFECT_RUNNER_SERVER_ENABLE=true is set in the Runner process/pod.
- Ensure the submitter can reach the Runner webserver (Service/URL, network policy).Marvin
09/10/2025, 8:59 AMSyméon del Marmol
09/10/2025, 9:13 AMMarvin
09/10/2025, 9:13 AMMarvin
09/10/2025, 9:15 AMprefect.task_worker.serve(...)
and prefect.flows.serve(...)
are long-running/blocking loops, the simplest way is to start one in a background thread and the other in the main thread.
Key points
- You must start a Runner that serves the flow(s) you plan to submit. This is what submit_to_runner
connects to.
- You can also serve your background “submitter” task in the same process via the Task Worker.
- Enable the Runner webserver via environment (there’s no webserver=
parameter on serve
). This env var must be set in the process that calls prefect.flows.serve(...)
.
Minimal example (single process, two threads)
import os
import threading
import asyncio
from prefect import flow, task, tags
from prefect.task_worker import serve as serve_tasks
from prefect.flows import serve as serve_flows
from prefect.runner import submit_to_runner
from prefect.flow_runs import wait_for_flow_run
from prefect.states import get_state_result
@flow
async def long_flow(input: str):
await dummy_task(input)
@task
async def dummy_task(input: str):
print(f"dummy task running: {input}")
await asyncio.sleep(5)
print(f"dummy task done: {input}")
@task(retries=3, retry_delay_seconds=[5, 10, 20])
async def dummy_task_serve():
# Submit a non-deployed flow reactively to the local Runner
with tags("resilient"):
fr = await submit_to_runner(long_flow, parameters={"input": "test"})
# IMPORTANT: wait for the flow RUN id (fr.id), not the flow_id
final = await wait_for_flow_run(fr.id)
return get_state_result(final.state)
def start_task_worker():
# Serve your background task so `.delay()` works
serve_tasks(dummy_task_serve, limit=10)
def start_flow_runner():
# Enable the Runner webserver so submit_to_runner can connect
os.environ["PREFECT_RUNNER_SERVER_ENABLE"] = "true"
# Optionally: set host/port via Prefect settings if you need a non-default bind
serve_flows(long_flow, limit=10)
if __name__ == "__main__":
# Start the Task Worker in a background thread
t = threading.Thread(target=start_task_worker, daemon=True)
t.start()
# Run the Flow Runner (with webserver) in the main thread
start_flow_runner()
How you’ll use it
- Your other backend can call dummy_task_serve.delay()
to trigger the background task. That task will submit long_flow
to the Runner that’s running in the same process.
- If the pod restarts, Prefect will reschedule crashed runs (with your retries) and you can reattach by querying flow_run_ids you persisted.
Why this fixes your earlier error
- Previously, you were serving only a task and submitting a flow — but there was no Runner webserver running to accept the submission. In this pattern, the Runner is started in the same process (and we explicitly enable the webserver), so submit_to_runner
has something to talk to.
Notes and gotchas
- Threads are fine here; each “serve” loop manages its own async runtime. If you want stronger isolation, you can use two processes instead, but threads are simpler and meet your “single process” goal.
- If you split the submitter and runner across pods later, you’ll need to expose the Runner webserver via a Service and ensure the submitter can reach it.
- Be sure to persist fr.id
(the flow run ID) when you submit, so your backend can reattach after restarts.
- In your original snippet you used wait_for_flow_run(flow_run.flow_id)
— that’s wrong; use fr.id
.
Helpful references
- Runner API (submit_to_runner): prefect.runner
- Flow run helpers: prefect.flow_runs
- Task worker CLI (verified): prefect task serve
- Flow serve CLI: prefect flow serveMarvin
09/10/2025, 9:15 AMSyméon del Marmol
09/10/2025, 10:04 AMFile "/usr/lib/python3.11/asyncio/events.py", line 637, in get_child_watcher
raise NotImplementedError
2. When trying to submit the flow using submit_to_runner
from the background task, I see from the UI that it creates a task run with no name, that stays in "pending" state.Marvin
09/10/2025, 10:04 AMMarvin
09/10/2025, 10:07 AM# worker_entrypoint.py
import asyncio, os
try:
# Install a child watcher so subprocesses from threads don’t blow up
asyncio.get_event_loop_policy().set_child_watcher(asyncio.SafeChildWatcher())
except Exception:
pass
os.execvp("prefect", ["prefect", "worker", "start", "-p", os.environ["PREFECT_WORK_POOL_NAME"]])
Then in your Docker Compose service for the worker, run:
command: ["python", "/app/worker_entrypoint.py"]
This makes the ProcessWorker safe to spawn subprocesses even when the call comes from a non‑main thread.
2) submit_to_runner from your background task → “task run with no name” stuck in Pending
This typically means your submission did not reach a live Runner webserver, or you’re looking at the wrong run in the UI.
Checklist to fix/confirm:
- Make sure the Runner webserver is actually enabled in the same process that is serving your flows. There’s no webserver= argument on serve; you must enable it via environment in that process before serve(...) starts.
- Set this env var in the container/process that runs prefect.flows.serve(...):
PREFECT_RUNNER_SERVER_ENABLE=true
- On startup, check logs for a message that the Runner webserver started. If you don’t see a clear “Runner webserver is running at …” log, it’s not enabled.
- Serve flows and tasks in the same process (your dev goal) with two threads, and enable the Runner server before serving flows:
import os, threading, asyncio
from prefect import flow, task, tags
from prefect.task_worker import serve as serve_tasks
from prefect.flows import serve as serve_flows
from prefect.runner import submit_to_runner
from prefect.flow_runs import wait_for_flow_run
from prefect.states import get_state_result
@flow
async def long_flow(input: str):
await dummy_task(input)
@task
async def dummy_task(input: str):
print(f"dummy task running: {input}")
await asyncio.sleep(5)
print(f"dummy task done: {input}")
@task
async def dummy_task_serve():
with tags("resilient"):
fr = await submit_to_runner(long_flow, parameters={"input": "test"})
# IMPORTANT: wait on the flow RUN id, not flow_id
final = await wait_for_flow_run(fr.id)
return get_state_result(final.state)
def start_task_worker():
serve_tasks(dummy_task_serve, limit=10)
def start_flow_runner():
os.environ["PREFECT_RUNNER_SERVER_ENABLE"] = "true"
serve_flows(long_flow, limit=10)
if __name__ == "__main__":
t = threading.Thread(target=start_task_worker, daemon=True)
t.start()
start_flow_runner()
- If you still see “Pending”:Marvin
09/10/2025, 10:07 AM