<@ULVA73B9P> I have a deployed and scheduled flow ...
# ask-marvin
o
@Marvin I have a deployed and scheduled flow that creates runs that seem to keep going on "forever" despite completing all tasks. Any ideas as to why? What should i do with them to "clean up"?
m
thought for 364.1 seconds
A few common reasons a flow run can sit in Running “forever” even though your tasks finished: - The flow function never actually returns (e.g., lingering background threads, an un-awaited async task, a blocking loop/sleep at the end, or open network streams). - A subflow/deployment was triggered and not awaited, so the parent run is still waiting. - The final state update didn’t make it back to the API (network hiccup, worker shutdown timing) so the server still shows Running even though the process ended. - The process is alive with non-daemon threads keeping Python from exiting, so the run keeps heartbeating. Cleanup options - List stuck runs:
Copy code
prefect flow-run ls --state RUNNING --limit 100
- Cancel them (preferred if the process might still be alive):
Copy code
prefect flow-run cancel <flow_run_id>
- If you just want to remove them from the UI/history after cancelling:
Copy code
prefect flow-run delete <flow_run_id>
- If a run is clearly finished but never transitioned, you can force-set its state via the client:
Copy code
from uuid import UUID
  from prefect.client.orchestration import get_client
  from prefect.states import Completed

  async def mark_completed(run_id: str):
      async with get_client() as client:
          await client.set_flow_run_state(UUID(run_id), Completed(name="Manual fix"), force=True)
Docs: API client and REST: set flow run state How to prevent this going forward - Ensure the flow returns: add a final log and
return
value; avoid infinite loops or sleeps at the end. - Close/await background work: - Await all async tasks you start. - Avoid leaving non-daemon threads running (some libraries start worker threads; ensure they’re shut down). - Close streaming responses/clients so nothing blocks shutdown. - If you trigger subflows/deployments, make your intent explicit: - If you expect to wait for them, await the result or use the appropriate wait helper. - Add a guardrail with a flow-level timeout:
Copy code
from prefect import flow

  @flow(timeout_seconds=3600)
  def my_flow():
      ...
A couple quick checks to diagnose root cause - Do worker logs say “Flow run finished in state Completed”? If yes but the UI shows Running, this was likely a final state update issue — force-set or cancel as above. - Are there subflows or external processes launched from your flow? - Any async tasks or threads started that aren’t awaited/joined? - What worker/infrastructure are you using (process, Docker, Kubernetes)? If you can share: - A snippet of the flow’s end (post-task code), - Whether you see a “finished” message in logs, - Your worker type/infrastructure, …I can help pinpoint the exact cause and suggest a targeted fix.
o
I have docker worker pool and flow runs done in created containers, local hosted. Log says Finished in state Completed() but not the Process for flow run ... exited cleanly. Has happened twice in a row for the same flow, no other flows have this issue. Nothing external launched from the flow, and no async work
@Marvin have docker worker pool and flow runs done in created containers, local hosted. Log says Finished in state Completed() but not the Process for flow run ... exited cleanly. Has happened twice in a row for the same flow, no other flows have this issue. Nothing external launched from the flow, and no async work
m
thought for 181.4 seconds
Thanks for the details — that helps. A few targeted notes for your setup: - In Docker worker pools, you won’t see the “Process for flow run … exited cleanly” message; that’s specific to the process-based infrastructure. The key symptom is the run staying RUNNING in the UI even though your flow logs say “Finished in state Completed()”. Given it’s only this one flow and it’s happened twice in a row, the most likely cause is something in that flow leaving a non-daemon resource alive after the flow returns (e.g., a thread or pool that wasn’t shut down). Python will keep the container alive in that case even though Prefect set the flow state to Completed. Immediate cleanup - List stuck runs:
Copy code
prefect flow-run ls --state RUNNING --limit 100
- Cancel the stuck ones:
Copy code
prefect flow-run cancel <flow_run_id>
- Optionally remove them from history:
Copy code
prefect flow-run delete <flow_run_id>
- If you want to force-complete a specific run that truly finished, you can set the state directly:
Copy code
from uuid import UUID
  from prefect.client.orchestration import get_client
  from prefect.states import Completed

  async def force_complete(run_id: str):
      async with get_client() as client:
          await client.set_flow_run_state(UUID(run_id), Completed(name="Manual fix"), force=True)
Links: API client and REST: set flow run state Confirm what Prefect thinks is happening - Inspect a run’s state history to verify the last orchestration state Prefect saved:
Copy code
prefect flow-run inspect <flow_run_id>
What typically causes this in a single flow - ThreadPoolExecutor or ProcessPoolExecutor created but not closed. If you use these anywhere, ensure a context manager or explicit shutdown:
Copy code
from concurrent.futures import ThreadPoolExecutor

  with ThreadPoolExecutor(max_workers=...) as ex:
      ex.map(...)
  # ensures shutdown and no leftover non-daemon threads
- A library that starts non-daemon background threads (e.g., Kafka consumers/producers, some gRPC/streaming clients, QueueListener/QueueHandler for logging). These often need an explicit
.close()
/
.stop()
/
.join()
. - Flow state hooks (
on_completion
) doing blocking work or starting threads that are not joined. Quick diagnostics to pinpoint the culprit - At the very end of the flow, log live threads to see what’s keeping Python alive:
Copy code
import threading
  from prefect import flow

  @flow
  def my_flow(...):
      ...
      # right before return:
      alive = [t.name for t in threading.enumerate()]
      print(f"End-of-flow thread snapshot: {alive}")
      return ...
- If you use executors anywhere, wrap them with a context manager as above. - As a one-time test (not a permanent fix), you could force-exit at the end of the flow to confirm it’s a lingering-thread issue:
Copy code
import os
  @flow
  def my_flow(...):
      ...
      os._exit(0)  # only as a diagnostic — skips normal cleanup
If that makes the container stop and the run finish, we’ve confirmed a leaked non-daemon resource. Safety net - Add a flow-level timeout as a guardrail so a future hang at teardown doesn’t run “forever”:
Copy code
from prefect import flow

  @flow(timeout_seconds=3600)
  def my_flow(...):
      ...
If you can share: - The flow run ID and the tail of the logs after “Finished in state Completed()” - Whether this flow uses ThreadPoolExecutor/ProcessPoolExecutor, Kafka, BigQuery Storage, QueueListener/QueueHandler, or any streaming clients - Whether the Docker container for that run is still up (
docker ps -a
) while the UI shows RUNNING …I can help pinpoint the exact leak and give you a specific shutdown pattern to add.