<@ULVA73B9P> I'm using Kubernetes infrastructure t...
# ask-marvin
s
@Marvin I'm using Kubernetes infrastructure to run task workers and backends replicas, calling flows from background tasks. How can I make the system resilient to restart of workers and backends, (either because of a crash or newly deployed pod) and the new starting worker restarting the task ?
m
thought for 471.8 seconds
Great question. Assuming you’re on Prefect 3.x, here’s how to make both your task workers and your “backend” process resilient to pod restarts in Kubernetes while keeping your fan-out pattern with background tasks. High-level principles - Persist orchestration state and results in the Prefect API + durable result storage so work can resume on a different pod. - Rely on retries and idempotency for tasks so “crashed” work gets rescheduled safely. - Never rely on in-memory Python futures alone. Store run IDs so you can reattach after your backend restarts. What happens on restarts - If a task worker pod dies mid-task, Prefect will eventually mark the task run Crashed. If the task has retries configured, it will transition to AwaitingRetry then be picked up by any healthy worker replica when due. - If your backend process (that called
.delay()
and is awaiting
.result_async()
) restarts, those Python futures are gone. As long as you persisted the task run IDs, you can reattach to those runs and fetch results from Prefect. Make task runs restart-safe - Add retries and backoff to tasks so crashed runs are rescheduled:
Copy code
from prefect import task

@task(
    retries=5,
    retry_delay_seconds=[10, 30, 60, 120, 300],  # backoff
    retry_jitter_factor=0.2,                     # avoid thundering herd
)
def do_work(x: int) -> int:
    ...
- Persist results so you can read them after any process dies:
Copy code
@task(persist_result=True)
def do_work(x: int) -> int:
    ...
- Use idempotent task logic or a cache key to de-duplicate in case of duplicate execution after a crash (e.g., check-if-done-before-doing, or a cache policy keyed by your job identifier). Running task workers on Kubernetes - Run multiple replicas of a single “task worker” Deployment using the CLI:
Copy code
prefect task serve ./path/to/tasks.py:do_work --limit 20
This command is verified by
prefect task serve --help
. Multiple replicas are safe; they coordinate via the API queue. If one dies, another will pick up the run. - Add liveness/readiness probes and a PodDisruptionBudget so at least one worker stays up during rolling updates. - Use an external result store (e.g., S3/GCS) for durability so
result_async()
can always retrieve results. You can set a default result storage at the flow/task level or via settings/blocks. Fan-out pattern with .delay() and dynamic names - Your
.with_options(name=...)
+
.delay()
+
asyncio.gather
pattern is fine. To survive backend restarts, capture and persist the run IDs for every delayed task alongside your own business job IDs. - On restart, reattach by polling the run state by ID and materializing the result. A pattern you can adopt:
Copy code
import asyncio
from prefect import get_client
from prefect.states import State, get_state_result

async def wait_for_task_result(task_run_id: str, poll=2.0):
    async with get_client() as client:
        while True:
            tr = await client.read_task_run(task_run_id)
            state: State = tr.state
            if state.type.value in {"COMPLETED", "FAILED", "CRASHED", "CANCELLED"}:
                break
            await asyncio.sleep(poll)

        # Completed → return result; all other terminal states → raise
        if state.type.value == "COMPLETED":
            return get_state_result(state)
        else:
            # Will raise the original exception if one was captured
            return get_state_result(state)  # this will raise
Notes: - Ensure
persist_result=True
on the task; otherwise results may only exist in the original process memory. - Persist the task run IDs returned by
.delay()
so you can call
wait_for_task_result(task_run_id)
after a restart. - If you prefer push vs pull, you can also have the task send a webhook when it completes, then your backend consumes that event instead of polling. Triggering flows resiliently
- Prefer calling deployments for flows instead of running flows in-process. This decouples the flow run from your backend’s lifecycle. Example:
Copy code
from prefect.deployments import run_deployment
from prefect.flow_runs import wait_for_flow_run

# Trigger a flow deployment
fr = await run_deployment("project/my-flow-deployment", parameters={"x": 1})

# Persist fr.id in your DB
# Later (or across restarts), reattach:
final_state = await wait_for_flow_run(fr.id)
-
run_deployment
and
wait_for_flow_run
are designed to work across process restarts and were verified to exist in 3.x: - API reference: deployments - API reference: flow_runs Operational guardrails for restarts - Concurrency limits to smooth restarts: set global or work queue concurrency limits so retries after a crash don’t stampede. - Use jitter on retries (
retry_jitter_factor
) to stagger re-execution. - Ensure your Kubernetes Deployment for task workers and Prefect Server has proper probes and PDBs. A few clarifications that will help me tailor this: - Are you using Prefect Cloud or Prefect Server (self-hosted)? If Server, what’s your backing database? - Are your background tasks exclusively Prefect tasks (via
.delay()
), or do you also fan out to separate flow runs? - Do you already have external result storage configured (S3/GCS/Azure), or should I share a quick setup? References - Built for resilience blog: Design patterns for resilient pipelines - Idempotency best practices: Importance of idempotent pipelines - Task CLI help (verified):
prefect task serve --help
If you share a short code sample of how you’re creating the tasks and awaiting them, I can annotate it with the minimal changes needed to capture run IDs, persist results, and reattach after restarts.
s
@Marvin I'm using Prefect 3.4.16, self-hosted. The background tasks trigger flows, calling them directly (not using "run_deployment") as I need the execution to be reactive. I'm currently persisting all results on a S3 bucket, and tasks within flows are idempotent. With the solution you brings, how do you recover the flow_run_id ? when a pod restarts, how does it know what ids should be re-attached taken because detached from its previous process ?
m
thought for 150.4 seconds
Thanks — that context helps. Short answer: you must persist the flow_run_id yourself at submission time. On restart, read those IDs from your store and reattach by querying Prefect for their states/results. If you ever lose an ID, tag your runs with a correlation key so you can reconstruct the mapping by querying for runs with that tag. Since you’re not using deployments (for reactivity), use the Runner submission API so each run is tracked by Prefect and you get a FlowRun object back immediately: - Submit reactively and capture IDs
Copy code
from prefect.runner import submit_to_runner
from prefect.flow_runs import wait_for_flow_run
from prefect import tags

# Your flow
from my_flows import my_flow

async def launch_job(job_id: str, params: dict) -> str:
    # Tag the run so it can be discovered if you ever lose the ID
    with tags(f"job:{job_id}", "origin:backend"):
        flow_run = await submit_to_runner(my_flow, parameters=params)

    # Persist mapping job_id -> flow_run_id to your DB
    db.save_mapping(job_id=job_id, flow_run_id=str(flow_run.id))
    return str(flow_run.id)
- Reattach after a backend restart
Copy code
from prefect.flow_runs import wait_for_flow_run
from prefect.states import get_state_result

async def resume_tracking(job_id: str):
    flow_run_id = db.get_mapping(job_id)
    flow_run = await wait_for_flow_run(flow_run_id)  # waits until terminal state

    if flow_run.state.is_completed():
        result = get_state_result(flow_run.state)  # reads from your S3 result storage
        return result
    else:
        # FAILED/CRASHED/CANCELLED → raises underlying exception if present
        return get_state_result(flow_run.state)
- If you didn’t persist the ID (reconstruction by tag)
Copy code
from prefect import get_client

async def find_flow_run_ids_for_job(job_id: str):
    async with get_client() as client:
        # Query for most recent runs with your correlation tag
        runs = await client.read_flow_runs(
            # Use the tags filter; you can also filter by time window / state
            flow_runs={"tags": {"all_": [f"job:{job_id}"]}, "limit": 50}
        )
        return [str(fr.id) for fr in runs]
Why this is resilient to pod restarts - Workers: if a worker pod dies mid-task, Prefect will mark the task run Crashed; with retries configured on tasks, it transitions to AwaitingRetry and is picked up by any healthy worker replica when due. - Backend: your backend futures go away on a restart, but your DB has job_id -> flow_run_id. You reattach using
wait_for_flow_run(flow_run_id)
and materialize the result from S3. Recommended hardening you already mostly have - Tasks idempotent: great. Keep retries with backoff+jitter on tasks to auto-recover from crashes. - Persist results to S3: ensures
get_state_result
can read results after any process dies. - Add tags like
job:{job_id}
and
origin:backend
so you can query or drive Automations. - Use multiple replicas of
prefect task serve
workers (with readiness/liveness probes and a PodDisruptionBudget) to ride through rolling updates. Notes - Calling a flow function directly in-process will not be resilient to a backend restart. Submitting with
submit_to_runner
creates a tracked flow run in Prefect and immediately returns the FlowRun object (with ID) so you can persist and reattach. - If you ever do switch to deployments,
run_deployment
behaves similarly (returns a FlowRun quickly) but you don’t need it for reactivity if Runner fits your needs. Docs for reference - Flow runs API helpers: prefect.flow_runs - Runner module: prefect.runner - Tasks (retries, persistence): prefect.tasks If you share your current submission snippet, I can adapt it to use
submit_to_runner
and add the minimal persistence (and tag) hooks to make restart recovery plug-and-play.
s
@Marvin When a flow or task has crashed because the runner or process where the flow was attached has disappeared, is there a way to distinguish that from a real crash ? For instance, I'd like to have a specific retry policy on flows when it was detached from its original caller, while when an error happen, I'd need to keep the failure.
m
thought for 101.0 seconds
Yes — Prefect distinguishes “the process disappeared” from “your code raised an error” at the state level: - CRASHED: The engine/worker/runner died or lost heartbeat while the run was executing (pod killed, OOM, SIGKILL, etc.). No user exception was captured. - FAILED: Your code raised an exception that Prefect caught and recorded. That difference is reliable and what you should branch on. How to apply different policies Option A: Enforce the policy at the supervisor/backend - When you submit the flow (via Runner), persist the flow_run_id. - After completion, read the final state and branch: - If state.type == "CRASHED", re-submit (idempotent) with your “detached” retry policy. - If state.type == "FAILED", keep the failure. Example
Copy code
from prefect.flow_runs import wait_for_flow_run
from prefect.states import get_state_result

async def handle_completion(flow_run_id: str):
    fr = await wait_for_flow_run(flow_run_id, log_states=True)

    if fr.state.type.value == "CRASHED":
        # Detached/crash → re-submit with your policy
        params = db.get_params_for(flow_run_id)
        await resubmit_with_same_params(params)
        return

    if fr.state.type.value == "FAILED":
        # Keep the failure
        exc = None
        try:
            get_state_result(fr.state)  # will raise original exception
        except Exception as e:
            exc = e
        record_failure(flow_run_id, exc)
        return

    # COMPLETED, CANCELLED, etc.
    result = get_state_result(fr.state)
    record_success(flow_run_id, result)
Option B: Use an Automation on “Crashed” events - Create an Automation with an Event trigger on flow-run crashed and a tag filter (e.g.,
origin:backend
). - Action: Call a webhook in your backend that re-submits the same job parameters. Do nothing on Failed events. Links: - API reference: prefect.automations - Events overview: Events What about tasks inside flows? - Tasks can have conditional retries with
retry_condition_fn
, which lets you suppress retries on “application errors” you don’t want retried. - Example: only retry on infrastructure/transient exceptions you whitelist; let business logic errors fail immediately.
Copy code
from prefect import task
from prefect.exceptions import InfrastructureError

def retry_only_on_transient(exc: BaseException | None) -> bool:
    # Return True to retry; False to keep the failure
    # Whitelist infrastructure-like/transient errors
    return isinstance(exc, (InfrastructureError, TimeoutError, ConnectionError))

@task(
    retries=5,
    retry_delay_seconds=[5, 10, 20, 40, 60],
    retry_jitter_factor=0.2,
    retry_condition_fn=retry_only_on_transient,
    persist_result=True,
)
def do_work(...):
    ...
Note: If the process dies, there’s no Python exception for that attempt; the task/flow run will be marked CRASHED by the server. The retry_condition_fn only runs when an exception is captured. In practice: - Use task-level retry_condition_fn to control retries for in-process exceptions. - Use state.type branching (CRASHED vs FAILED) at the flow level (or in your supervisor) to decide on re-submission when a pod/runner disappears. Useful API bits - State types: prefect.states - Read a flow run and inspect its state
Copy code
from prefect import get_client

async def is_crashed(flow_run_id: str) -> bool:
    async with get_client() as client:
        fr = await client.read_flow_run(flow_run_id)
    return fr.state.type.value == "CRASHED"
- You can also inspect
fr.state.message
to differentiate “lost heartbeat” style crashes from others if you need finer-grained routing. Given your setup (self-hosted, flows submitted reactively, S3 result storage, idempotent tasks), the simplest and most robust approach is: - Persist flow_run_id at submission time.
- After completion, branch on final state.type: - CRASHED → re-submit with your crash-specific policy. - FAILED → keep failure. - Optionally add an Automation to trigger your webhook on flow-run crashed to avoid polling.
s
@Marvin if a flow runs a sub-flow, will both flows be CRASHED if the process is killed during the run of a subflow ?
m
thought for 38.4 seconds
Short answer: if the subflow is executed in-process (the usual case when you call another @flow from a flow), killing the process will stop heartbeats for both runs and they will both end in CRASHED state. The parent cannot “handle” the child’s crash because it died too. Details - Default subflow call (in-process): calling an @flow from another @flow creates a separate flow run, but the same engine process executes both. If the pod/process dies during the subflow, the server will detect missing heartbeats for both the child and the parent and mark both runs CRASHED. - Remote subflow pattern (separate infra): if the parent triggers the child as a separate run on different infrastructure (e.g., via
run_deployment
or
submit_to_runner
from inside the flow), then killing the parent’s process will typically mark only the parent CRASHED; the child can continue running because it’s in another process/pod. There’s no automatic cascade unless you add one (e.g., an Automation that cancels children when the parent crashes). What you can do with this - If you want to re-run only the parent after an in-process crash, you can key off the parent’s state.type == CRASHED and resubmit just the root flow. - If you use remote subflows and want cascading behavior, add an Automation on “flow-run crashed” for the parent that: - finds children by
parent_flow_run_id
(or by a shared tag), then - cancels them or triggers your retry policy. - Conversely, if you want children to continue independently, keep them remote and avoid cascaded cancellation. Finding children by parent - Each subflow run records
parent_flow_run_id
and
root_flow_run_id
. You can query with the client for children if needed and decide how to handle them after a parent crash. References - State types: prefect.states - Runner helpers: prefect.runner If you share how you’re invoking the subflow (in-process vs submitting another run), I can give you concrete snippets for detection and the Automation/cancellation pattern.
s
@Marvin When trying to submit a flow to a runner, I get the following error:
Copy code
Message: "Task run failed with exception: RuntimeError('Failed to connect to the `Runner` webserver. Ensure that the server is running and reachable. You can run the webserver either by starting your `serve` process with `webserver=True`, or by setting `PREFECT_RUNNER_SERVER_ENABLE=True`.')"
What should I enable ? Is the approach you described valid with flows that are not deployed ?
m
thought for 245.7 seconds
You’re hitting that error because
submit_to_runner
needs a Runner webserver to be running and reachable. Right now, there isn’t one (or it’s not reachable from your submitting process), so the submission can’t be accepted. What to enable - Start a Runner with its webserver enabled in the process that will execute your flows: - In Python: pass
webserver=True
when you start your serve process - Or via env: set
PREFECT_RUNNER_SERVER_ENABLE=true
in that process/pod Practical ways to run it - Python API (useful in your Kubernetes “runner” pod):
Copy code
from prefect.flows import serve
from my_flows import my_flow  # the flow(s) you plan to submit

if __name__ == "__main__":
    # Start a Runner, enable its webserver, and serve these flows
    serve(my_flow, limit=20, webserver=True)
- CLI (if you prefer):
Copy code
# In the pod that will execute flows
# Set the env var so the Runner’s webserver is started
export PREFECT_RUNNER_SERVER_ENABLE=true
prefect flow serve ./path/to/file.py:my_flow -n my-flow-deployment
Note: the CLI doesn’t expose a
--webserver
flag; use the env var as shown. Networking in Kubernetes - By default the webserver binds to localhost. If your submitter runs in a different pod, expose the Runner webserver through a Service and set the Runner server settings so it binds to a routable address (e.g., 0.0.0.0) and your submitter knows where to reach it. - Set the Runner server settings via env (e.g.,
PREFECT_RUNNER_SERVER_ENABLE=true
, plus host/port if you need to change bind address). If you want, I can share a minimal K8s Deployment + Service manifest. Is this valid without deployments? - Yes. This Runner-based approach is specifically for flows that are not deployed. It remains reactive:
submit_to_runner
returns a
FlowRun
immediately and the run is executed by the Runner process that you’ve “served.” Minimal submitter example (your backend)
Copy code
from prefect.runner import submit_to_runner
from prefect.flow_runs import wait_for_flow_run
from prefect.states import get_state_result
from my_flows import my_flow

# Submit reactively
fr = await submit_to_runner(my_flow, parameters={"x": 1})

# Persist fr.id somewhere durable, then reattach later if needed
final = await wait_for_flow_run(fr.id)
result = get_state_result(final.state)  # reads from S3 since you persist results
Docs for reference - Runner API: prefect.runner - Flow runs helpers: prefect.flow_runs - Settings (env): prefect.settings If you share how your pods are arranged (which pod will host the Runner and which will submit), I can give you exact env vars and a small K8s manifest to make the webserver reachable between them.
s
@Marvin it doesn't work: • the
serve
method does not take a webserver parameter • the
PREFECT_RUNNER_SERVER_ENABLE=true
has no effect I still get the error when trying to call the flow from a background task, using
submit_to_runner
function. Here is the code of my worker:
Copy code
@flow
async def long_flow(input: str):
    await dummy_task(input)

@task
async def dummy_task(input: str):
    print(f"dummy task running: {input}")
    await asyncio.sleep(30)
    print(f"dummy task done: {input}")

@task
async def dummy_task_serve():
    with tags("resilient"):
        flow_run: FlowRun = await submit_to_runner(long_flow, parameters={"input": "test"})
    await wait_for_flow_run(flow_run.flow_id)


if __name__ == "__main__":
    serve(dummy_task_serve)
I call this to trigger the background task from my other backend:
Copy code
dummy_task_serve.delay()
=> What I'm trying to do is getting the flow or background task rescheduled in case of the worker restarts or crashes.
m
thought for 152.1 seconds
Thanks for the concrete example — a few important things are going on here: - You’re calling submit_to_runner from a task, but you are not actually running a Runner webserver anywhere. You’re “serving” a task, not your flow. - Your call to wait_for_flow_run is using flow_id instead of the flow_run_id (should be fr.id). - The webserver toggle must be enabled in the process that serves flows (the Runner), not in your task worker. Setting PREFECT_RUNNER_SERVER_ENABLE on the task worker won’t help if there’s no Runner process up. What to change 1) Run a Runner process that serves the flow(s) you want to submit - Put your flows in a small “flow runner” process/pod and start it with the Runner webserver enabled. Don’t serve your submitter task here; serve the flow(s). Example runner.py
Copy code
from prefect import flow, serve

@flow
async def long_flow(input: str):
    print(f"running: {input}")
    # ... your work ...

if __name__ == "__main__":
    # In this process/pod, set:
    #   PREFECT_RUNNER_SERVER_ENABLE=true
    # Optionally also set host/port and expose with a Service if other pods will connect.
    serve(long_flow, limit=10)
- In Kubernetes: - Set env var PREFECT_RUNNER_SERVER_ENABLE=true in this pod. - If the submitter runs in another pod, bind the webserver to 0.0.0.0 and expose it via a Service. Point your submitter at that URL. (We can share a minimal Deployment + Service if helpful.) - Note: PREFECT_RUNNER_SERVER_ENABLE must be set in this “flow runner” pod, not just anywhere. 2) Run your task worker separately - This is the pod that executes your background tasks (.delay()). - It can call submit_to_runner as long as it can reach the Runner webserver over the network. Your task code (fixed)
Copy code
import asyncio
from prefect import task, flow, tags
from prefect.runner import submit_to_runner
from prefect.flow_runs import wait_for_flow_run
from prefect.states import get_state_result

@flow
async def long_flow(input: str):
    await dummy_task(input)

@task
async def dummy_task(input: str):
    print(f"dummy task running: {input}")
    await asyncio.sleep(30)
    print(f"dummy task done: {input}")

@task(retries=3, retry_delay_seconds=[10, 30, 60])  # optional resilience on the submitter task itself
async def dummy_task_serve():
    with tags("resilient"):
        fr = await submit_to_runner(long_flow, parameters={"input": "test"})
    # BUGFIX: wait for the flow RUN id, not the flow id
    final = await wait_for_flow_run(fr.id)
    result = get_state_result(final.state)  # reads from S3 if you persist results
    return result
Start your task worker with:
Copy code
prefect task serve ./path/to/file.py:dummy_task_serve --limit 20
Reference: prefect task serve help CLI: prefect task serve 3) Networking between submitter and runner - If they’re different pods: - Runner pod: set PREFECT_RUNNER_SERVER_ENABLE=true and bind to a routable address; expose with a Service. - Submitter pod: point submit_to_runner at the Runner server URL (configure the Runner server URL setting for the client). Make sure both pods share the same Prefect API profile (PREFECT_API_URL/PREFECT_API_KEY) so the flow run is registered in the same backend. - If they’re the same pod/process, submitting to localhost should “just work” once the Runner webserver is enabled. 4) Your original question: Is this valid for non-deployed flows? - Yes. This Runner pattern is explicitly for flows that are not deployed. You get reactive submissions via submit_to_runner, plus resiliency because the flow run is tracked by Prefect and results are persisted (S3 in your case). Common pitfalls to check - Make sure you are serving the flow(s) with the Runner (not the submitter task). - Ensure PREFECT_RUNNER_SERVER_ENABLE=true is set in the Runner process/pod. - Ensure the submitter can reach the Runner webserver (Service/URL, network policy).
- Use fr.id (flow_run_id) with wait_for_flow_run, not flow_id. If you still get the connection error after splitting the processes, please share: - How you are starting the Runner process/pod and its logs at startup - The env vars you’ve set for both pods (Runner and submitter) - Whether the pods are in the same namespace and how you are addressing the Runner Service Relevant docs - Runner API: prefect.runner - Flow run helpers: prefect.flow_runs - Settings (including Runner server settings): prefect.settings
s
@Marvin is there a way to server background tasks and flows in the same process ? Can I do that using threads ?
m
thought for 115.6 seconds
Yes — you can run a Task Worker and a Flow Runner in the same process. Since both
prefect.task_worker.serve(...)
and
prefect.flows.serve(...)
are long-running/blocking loops, the simplest way is to start one in a background thread and the other in the main thread. Key points - You must start a Runner that serves the flow(s) you plan to submit. This is what
submit_to_runner
connects to. - You can also serve your background “submitter” task in the same process via the Task Worker. - Enable the Runner webserver via environment (there’s no
webserver=
parameter on
serve
). This env var must be set in the process that calls
prefect.flows.serve(...)
. Minimal example (single process, two threads)
Copy code
import os
import threading
import asyncio

from prefect import flow, task, tags
from prefect.task_worker import serve as serve_tasks
from prefect.flows import serve as serve_flows
from prefect.runner import submit_to_runner
from prefect.flow_runs import wait_for_flow_run
from prefect.states import get_state_result

@flow
async def long_flow(input: str):
    await dummy_task(input)

@task
async def dummy_task(input: str):
    print(f"dummy task running: {input}")
    await asyncio.sleep(5)
    print(f"dummy task done: {input}")

@task(retries=3, retry_delay_seconds=[5, 10, 20])
async def dummy_task_serve():
    # Submit a non-deployed flow reactively to the local Runner
    with tags("resilient"):
        fr = await submit_to_runner(long_flow, parameters={"input": "test"})
    # IMPORTANT: wait for the flow RUN id (fr.id), not the flow_id
    final = await wait_for_flow_run(fr.id)
    return get_state_result(final.state)

def start_task_worker():
    # Serve your background task so `.delay()` works
    serve_tasks(dummy_task_serve, limit=10)

def start_flow_runner():
    # Enable the Runner webserver so submit_to_runner can connect
    os.environ["PREFECT_RUNNER_SERVER_ENABLE"] = "true"
    # Optionally: set host/port via Prefect settings if you need a non-default bind
    serve_flows(long_flow, limit=10)

if __name__ == "__main__":
    # Start the Task Worker in a background thread
    t = threading.Thread(target=start_task_worker, daemon=True)
    t.start()

    # Run the Flow Runner (with webserver) in the main thread
    start_flow_runner()
How you’ll use it - Your other backend can call
dummy_task_serve.delay()
to trigger the background task. That task will submit
long_flow
to the Runner that’s running in the same process. - If the pod restarts, Prefect will reschedule crashed runs (with your retries) and you can reattach by querying flow_run_ids you persisted. Why this fixes your earlier error - Previously, you were serving only a task and submitting a flow — but there was no Runner webserver running to accept the submission. In this pattern, the Runner is started in the same process (and we explicitly enable the webserver), so
submit_to_runner
has something to talk to. Notes and gotchas - Threads are fine here; each “serve” loop manages its own async runtime. If you want stronger isolation, you can use two processes instead, but threads are simpler and meet your “single process” goal. - If you split the submitter and runner across pods later, you’ll need to expose the Runner webserver via a Service and ensure the submitter can reach it. - Be sure to persist
fr.id
(the flow run ID) when you submit, so your backend can reattach after restarts. - In your original snippet you used
wait_for_flow_run(flow_run.flow_id)
— that’s wrong; use
fr.id
. Helpful references - Runner API (submit_to_runner): prefect.runner - Flow run helpers: prefect.flow_runs - Task worker CLI (verified): prefect task serve - Flow serve CLI: prefect flow serve
If you want, I can translate the above into a small K8s Deployment that runs both services in one container and keeps the Runner webserver reachable only within the pod.
s
@Marvin currently, I'm not in Kubernetes but in my development environment, where things are running in a docker compose. I've done the serve of flows and background tasks in the same process, and used a work-pool type "process". What I don't understand though is: 1. When running my "long_flow" from the prefect server UI, I get this error:
Copy code
File "/usr/lib/python3.11/asyncio/events.py", line 637, in get_child_watcher
    raise NotImplementedError
2. When trying to submit the flow using
submit_to_runner
from the background task, I see from the UI that it creates a task run with no name, that stays in "pending" state.
m
thought for 179.2 seconds
Thanks for the details — two separate issues here. 1) Running from the UI → NotImplementedError: get_child_watcher What’s happening: - When you click “Run” in the UI, that creates a deployment run that is executed by a Prefect Worker (e.g., “process” work pool). - On Python 3.11, creating subprocesses from a non‑main thread on Unix requires an asyncio “child watcher”. If the current event loop policy does not provide one, asyncio raises NotImplementedError when something tries to spawn a subprocess from a thread. Why you’re hitting it: - The ProcessWorker uses asyncio to launch your flow run in a subprocess. On some setups (Python 3.11, Docker), that call happens in a thread where no child watcher is installed, so Python raises the NotImplementedError you see. Ways to resolve: - Easiest: for dev, don’t use the “process” work pool path at all. If you’re using the Runner pattern for reactivity, stick to submit_to_runner and don’t click “Run” in the UI for this flow. Runner runs do not involve the ProcessWorker and bypass this child watcher issue. - If you do want to run from the UI with a process work pool, add a child watcher at worker startup. Since the CLI worker does not add one, wrap its start in a tiny Python entrypoint that installs a watcher before exec’ing the Prefect worker:
Copy code
# worker_entrypoint.py
  import asyncio, os

  try:
      # Install a child watcher so subprocesses from threads don’t blow up
      asyncio.get_event_loop_policy().set_child_watcher(asyncio.SafeChildWatcher())
  except Exception:
      pass

  os.execvp("prefect", ["prefect", "worker", "start", "-p", os.environ["PREFECT_WORK_POOL_NAME"]])
Then in your Docker Compose service for the worker, run:
Copy code
command: ["python", "/app/worker_entrypoint.py"]
This makes the ProcessWorker safe to spawn subprocesses even when the call comes from a non‑main thread. 2) submit_to_runner from your background task → “task run with no name” stuck in Pending This typically means your submission did not reach a live Runner webserver, or you’re looking at the wrong run in the UI. Checklist to fix/confirm: - Make sure the Runner webserver is actually enabled in the same process that is serving your flows. There’s no webserver= argument on serve; you must enable it via environment in that process before serve(...) starts. - Set this env var in the container/process that runs prefect.flows.serve(...):
Copy code
PREFECT_RUNNER_SERVER_ENABLE=true
- On startup, check logs for a message that the Runner webserver started. If you don’t see a clear “Runner webserver is running at …” log, it’s not enabled. - Serve flows and tasks in the same process (your dev goal) with two threads, and enable the Runner server before serving flows:
Copy code
import os, threading, asyncio
  from prefect import flow, task, tags
  from prefect.task_worker import serve as serve_tasks
  from prefect.flows import serve as serve_flows
  from prefect.runner import submit_to_runner
  from prefect.flow_runs import wait_for_flow_run
  from prefect.states import get_state_result

  @flow
  async def long_flow(input: str):
      await dummy_task(input)

  @task
  async def dummy_task(input: str):
      print(f"dummy task running: {input}")
      await asyncio.sleep(5)
      print(f"dummy task done: {input}")

  @task
  async def dummy_task_serve():
      with tags("resilient"):
          fr = await submit_to_runner(long_flow, parameters={"input": "test"})
      # IMPORTANT: wait on the flow RUN id, not flow_id
      final = await wait_for_flow_run(fr.id)
      return get_state_result(final.state)

  def start_task_worker():
      serve_tasks(dummy_task_serve, limit=10)

  def start_flow_runner():
      os.environ["PREFECT_RUNNER_SERVER_ENABLE"] = "true"
      serve_flows(long_flow, limit=10)

  if __name__ == "__main__":
      t = threading.Thread(target=start_task_worker, daemon=True)
      t.start()
      start_flow_runner()
- If you still see “Pending”:
- Confirm you’re looking at the flow run created by submit_to_runner (resource: Flow run). The “task run with no name” you’re seeing may be your background task itself (dummy_task_serve) which will remain Running/Pending while it awaits the child flow run. The flow run should transition to Running and then Completed/Failed. - Ensure you’re awaiting the correct ID: use fr.id (flow_run_id) with wait_for_flow_run. Using fr.flow_id will not complete and can look like a “stuck” task. - Verify both loops are alive: you should see logs for the Task Worker picking up dummy_task_serve and the Runner picking up long_flow. If the Runner webserver isn’t up, submission will fail or hang. Putting it together in dev with Docker Compose - One container that runs both the Task Worker and Flow Runner (two threads) as shown above. - Do not involve a Process work pool/worker for this path. Use your Runner for all reactive runs. - If you also want to test “UI run” with a Process worker, run a separate worker container using the SafeChildWatcher shim shown above. If you share: - your current compose services (env vars, entrypoints, commands), and - the exact log lines you see when starting the combined process, I can spot-check environment placement (e.g., making sure PREFECT_RUNNER_SERVER_ENABLE is set in the same process that calls prefect.flows.serve) and help ensure the Runner webserver is actually live.