Olve Heitmann
10/03/2025, 10:34 PMMarvin
10/03/2025, 10:34 PMMarvin
10/03/2025, 10:40 PMprefect flow-run ls --state RUNNING --limit 100
- Cancel them (preferred if the process might still be alive):
prefect flow-run cancel <flow_run_id>
- If you just want to remove them from the UI/history after cancelling:
prefect flow-run delete <flow_run_id>
- If a run is clearly finished but never transitioned, you can force-set its state via the client:
from uuid import UUID
from prefect.client.orchestration import get_client
from prefect.states import Completed
async def mark_completed(run_id: str):
async with get_client() as client:
await client.set_flow_run_state(UUID(run_id), Completed(name="Manual fix"), force=True)
Docs: API client and REST: set flow run state
How to prevent this going forward
- Ensure the flow returns: add a final log and return
value; avoid infinite loops or sleeps at the end.
- Close/await background work:
- Await all async tasks you start.
- Avoid leaving non-daemon threads running (some libraries start worker threads; ensure they’re shut down).
- Close streaming responses/clients so nothing blocks shutdown.
- If you trigger subflows/deployments, make your intent explicit:
- If you expect to wait for them, await the result or use the appropriate wait helper.
- Add a guardrail with a flow-level timeout:
from prefect import flow
@flow(timeout_seconds=3600)
def my_flow():
...
A couple quick checks to diagnose root cause
- Do worker logs say “Flow run finished in state Completed”? If yes but the UI shows Running, this was likely a final state update issue — force-set or cancel as above.
- Are there subflows or external processes launched from your flow?
- Any async tasks or threads started that aren’t awaited/joined?
- What worker/infrastructure are you using (process, Docker, Kubernetes)?
If you can share:
- A snippet of the flow’s end (post-task code),
- Whether you see a “finished” message in logs,
- Your worker type/infrastructure,
…I can help pinpoint the exact cause and suggest a targeted fix.Olve Heitmann
10/03/2025, 10:46 PMOlve Heitmann
10/03/2025, 10:52 PMMarvin
10/03/2025, 10:52 PMMarvin
10/03/2025, 10:55 PMprefect flow-run ls --state RUNNING --limit 100
- Cancel the stuck ones:
prefect flow-run cancel <flow_run_id>
- Optionally remove them from history:
prefect flow-run delete <flow_run_id>
- If you want to force-complete a specific run that truly finished, you can set the state directly:
from uuid import UUID
from prefect.client.orchestration import get_client
from prefect.states import Completed
async def force_complete(run_id: str):
async with get_client() as client:
await client.set_flow_run_state(UUID(run_id), Completed(name="Manual fix"), force=True)
Links: API client and REST: set flow run state
Confirm what Prefect thinks is happening
- Inspect a run’s state history to verify the last orchestration state Prefect saved:
prefect flow-run inspect <flow_run_id>
What typically causes this in a single flow
- ThreadPoolExecutor or ProcessPoolExecutor created but not closed. If you use these anywhere, ensure a context manager or explicit shutdown:
from concurrent.futures import ThreadPoolExecutor
with ThreadPoolExecutor(max_workers=...) as ex:
ex.map(...)
# ensures shutdown and no leftover non-daemon threads
- A library that starts non-daemon background threads (e.g., Kafka consumers/producers, some gRPC/streaming clients, QueueListener/QueueHandler for logging). These often need an explicit .close()
/ .stop()
/ .join()
.
- Flow state hooks (on_completion
) doing blocking work or starting threads that are not joined.
Quick diagnostics to pinpoint the culprit
- At the very end of the flow, log live threads to see what’s keeping Python alive:
import threading
from prefect import flow
@flow
def my_flow(...):
...
# right before return:
alive = [t.name for t in threading.enumerate()]
print(f"End-of-flow thread snapshot: {alive}")
return ...
- If you use executors anywhere, wrap them with a context manager as above.
- As a one-time test (not a permanent fix), you could force-exit at the end of the flow to confirm it’s a lingering-thread issue:
import os
@flow
def my_flow(...):
...
os._exit(0) # only as a diagnostic — skips normal cleanup
If that makes the container stop and the run finish, we’ve confirmed a leaked non-daemon resource.
Safety net
- Add a flow-level timeout as a guardrail so a future hang at teardown doesn’t run “forever”:
from prefect import flow
@flow(timeout_seconds=3600)
def my_flow(...):
...
If you can share:
- The flow run ID and the tail of the logs after “Finished in state Completed()”
- Whether this flow uses ThreadPoolExecutor/ProcessPoolExecutor, Kafka, BigQuery Storage, QueueListener/QueueHandler, or any streaming clients
- Whether the Docker container for that run is still up (docker ps -a
) while the UI shows RUNNING
…I can help pinpoint the exact leak and give you a specific shutdown pattern to add.