<@ULVA73B9P> my flow runs keep crashing. I do see ...
# ask-community
i
@Marvin my flow runs keep crashing. I do see some error in the runs but it should be going to failed state and not crashing state. I am on prefect 3.1.15
m
thought for 193.0 seconds
Thanks for the details — I know this is frustrating. High level - In Prefect 3.x, a run ends in Failed when a Python exception propagates through the flow and Prefect can record the terminal state. - A run ends in Crashed when the execution process stops unexpectedly and Prefect cannot report a terminal state (e.g., the process/container/pod is killed, exits abruptly, or loses heartbeat/connectivity). Docs for reference: - States in Prefect (includes Crashed) - Detecting zombie/crashed flows Why you might see an error in logs but a “Crashed” state - If the underlying process is terminated (OOMKilled, SIGKILL, container/pod stopped, host reboot,
os._exit
, segfault in a C-extension, abrupt multiprocessing exit), Prefect can’t capture the exception to mark Failed — it only sees the heartbeat stop and marks Crashed. - This often shows up as: some error appears in stdout/stderr, then the worker/pod disappears or restarts. Quick triage questions - What worker/infrastructure are you using: Process, Docker, or Kubernetes? - Do you see a consistent pattern (e.g., after N minutes, on large inputs, at a specific task)? - For one example run, can you share: - The run ID - The worker logs around the time it ends - Any exit code or termination reason (e.g., 137/OOMKilled) What to check next (by environment) - Kubernetes -
kubectl describe pod <pod>
and
kubectl logs <pod>
(and
-p
for previous logs if it restarted) - Look for Reason “OOMKilled” or exit code 137/143. If OOM, raise memory limits/requests or reduce memory usage. - Docker -
docker ps -a
and
docker inspect <container>
to get the exit code;
docker logs <container>
for container logs. - Exit code 137 typically means OOMKilled; increase container memory or reduce workload. - Process worker (bare metal/VM) - Check system logs (
dmesg
, syslog) for OOM kills or signals; ensure nothing external is sending SIGKILL/SIGTERM. - If you’re spawning subprocesses or using libraries that call
os._exit
, that can cause crashes. Common causes and fixes - Out-of-memory (most common): manifests as OOMKilled/exit 137. Fix by increasing memory (K8s/Docker limits) or reducing memory footprint. - Hard process termination:
kill -9
, node reboots, preemptible spot instances, container restarts. Ensure infrastructure stability and graceful shutdowns. - Abrupt exits in code: - Avoid
os._exit(...)
and be careful with
sys.exit(...)
outside Prefect-managed flow context. - If using multiprocessing, prefer
spawn
start method and ensure child processes don’t hard-exit the parent. - Native/C-extension segfaults (e.g., some pandas/numpy/TensorFlow ops) will crash the interpreter; upgrade the library or isolate that work behind more memory/limits. - Heartbeat loss: if the worker loses connectivity or the runtime stops heartbeating, Prefect will classify as Crashed. Check worker logs for connectivity/errors. How to verify behavior locally - This should end as Failed:
Copy code
from prefect import flow, get_run_logger

@flow
def will_fail():
    logger = get_run_logger()
    <http://logger.info|logger.info>("About to raise an exception")
    raise RuntimeError("Intentional failure")

will_fail()
- This will simulate a crash (SIGKILL), likely ending as Crashed:
Copy code
from prefect import flow
import os, signal

@flow
def will_crash():
    os.kill(os.getpid(), signal.SIGKILL)

will_crash()
Getting “Failed” instead of “Crashed” - Ensure failures are normal Python exceptions inside the flow/task code so Prefect can capture them. - Avoid hard exits and manage memory to prevent OOM. - If you must use subprocesses, ensure they report errors back instead of terminating the parent. Optionally, auto-retry on Crash
- Create an Automation to “Rerun” when a flow run enters Crashed. This won’t fix the root cause but can improve resilience while you diagnose. See Detecting zombie/crashed flows for context and patterns. Next step If you can share: - Worker type (Process/Docker/Kubernetes) - One run ID - Worker/container/pod logs and any exit code/termination reason …I can help pinpoint why Prefect is classifying these as Crashed and suggest a targeted fix. Also, you’re on 3.1.15 — if feasible, consider upgrading to the latest 3.x; there have been numerous stability improvements since 3.1.x.
i
@Marvin we are using process workers. And this is the failure logs of a run. This is marked as Crashed, where it looks like it should have been marked as failed Worker 'ProcessWorker 055ca9c8-141e-4ccd-bebd-37526875290b' submitting flow run '8aeb6661-e0dd-4f75-8397-f463cab45ea6' 043951 PM prefect.flow_runs.worker Opening process... 043952 PM prefect.flow_runs.worker Completed submission of flow run '8aeb6661-e0dd-4f75-8397-f463cab45ea6' 043952 PM prefect.flow_runs.worker Beginning flow run 'cream-hound' for flow 'deployment-schedule-flow-v4' 044000 PM prefect.flow_runs Encountered exception during execution: AirbyteServerNotHealthyException() Traceback (most recent call last): File "/mnt/appdata/prefect-proxy/.venv/lib/python3.10/site-packages/prefect_airbyte/client.py", line 170, in trigger_manual_sync_connection response.raise_for_status() File "/mnt/appdata/prefect-proxy/.venv/lib/python3.10/site-packages/httpx/_models.py", line 829, in raise_for_status raise HTTPStatusError(message, request=request, response=self) httpx.HTTPStatusError: Client error '409 Conflict' for url 'http://internal-a113cfb39e1794b17b427cf96671af99-319922687.ap-south-1.elb.amazonaws.com/api/v1/connections/sync/' For more information check: https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/409 The above exception was the direct cause of the following exception: Traceback (most recent call last): File "/mnt/appdata/prefect-proxy/.venv/lib/python3.10/site-packages/prefect/flow_engine.py", line 755, in run_context yield self File "/mnt/appdata/prefect-proxy/.venv/lib/python3.10/site-packages/prefect/flow_engine.py", line 1363, in run_flow_sync engine.call_flow_fn() File "/mnt/appdata/prefect-proxy/.venv/lib/python3.10/site-packages/prefect/flow_engine.py", line 775, in call_flow_fn result = call_with_parameters(self.flow.fn, self.parameters) File "/mnt/appdata/prefect-proxy/.venv/lib/python3.10/site-packages/prefect/utilities/callables.py", line 208, in call_with_parameters return fn(*args, **kwargs) File "/tmp/tmpspk2d53rprefect/prefect-proxy-main/proxy/prefect_flows.py", line 316, in deployment_schedule_flow_v4 run_airbyte_connection_flow_v1(task_config) File "/mnt/appdata/prefect-proxy/.venv/lib/python3.10/site-packages/prefect/flows.py", line 1635, in call return run_flow( File "/mnt/appdata/prefect-proxy/.venv/lib/python3.10/site-packages/prefect/flow_engine.py", line 1520, in run_flow ret_val = run_flow_sync(**kwargs) File "/mnt/appdata/prefect-proxy/.venv/lib/python3.10/site-packages/prefect/flow_engine.py", line 1365, in run_flow_sync return engine.state if return_type == "state" else engine.result() File "/mnt/appdata/prefect-proxy/.venv/lib/python3.10/site-packages/prefect/flow_engine.py", line 335, in result _result = run_coro_as_sync(_result) File "/mnt/appdata/prefect-proxy/.venv/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 209, in run_coro_as_sync return call.result() File "/mnt/appdata/prefect-proxy/.venv/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 330, in result return self.future.result(timeout=timeout) File "/mnt/appdata/prefect-proxy/.venv/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 193, in result return self.fect/flow_engine.py", line 1339, in call_flow_fn result = await call_with_parameters(self.flow.fn, self.parameters) File "/mnt/appdata/prefect-proxy/.venv/lib/python3.10/site-packages/prefect_airbyte/flows.py", line 62, in run_connection_sync airbyte_sync = await task(airbyte_connection.trigger.aio)(airbyte_connection) File "/mnt/appdata/prefect-proxy/.venv/lib/python3.10/site-packages/prefect/task_engine.py", line 1416, in run_task_async return engine.state if return_type == "state" else await engine.result() File "/mnt/appdata/prefect-proxy/.venv/lib/python3.10/site-packages/prefect/task_engine.py", line 1016, in result raise self._raised File "/mnt/appdata/prefect-proxy/.venv/lib/python3.10/site-packages/prefect/task_engine.py", line 1337, in run_context yield self File "/mnt/appdata/prefect-proxy/.venv/lib/python3.10/site-packages/prefect/task_engine.py", line 1414, in run_task_async await engine.call_task_fn(txn) File "/mnt/appdata/prefect-proxy/.venv/lib/python3.10/site-packages/prefect/task_engine.py", line 1360, in call_task_fn result = await call_with_parameters(self.task.fn, parameters) File "/mnt/appdata/prefect-proxy/.venv/lib/python3.10/site-packages/prefect_airbyte/connections.py", line 388, in trigger ) = await airbyte_client.trigger_manual_sync_connection( File "/mnt/appdata/prefect-proxy/.venv/lib/python3.10/site-packages/prefect_airbyte/client.py", line 182, in trigger_manual_sync_connection raise err.AirbyteServerNotHealthyException() from e prefect_airbyte.exceptions.AirbyteServerNotHealthyException 044153 PM prefect.flow_runs Finished in state Failed('Flow run encountered an exception: AirbyteServerNotHealthyException: ') 044154 PM prefect.flow_runs Engine execution exited with unexpected exception Traceback (most recent call last): File "/mnt/appdata/prefect-proxy/.venv/lib/python3.10/site-packages/prefect_airbyte/client.py", line 170, in trigger_manual_sync_connection response.raise_for_status() File "/mnt/appdata/prefect-proxy/.venv/lib/python3.10/site-packages/httpx/_models.py", line 829, in raise_for_status raise HTTPStatusError(message, request=request, response=self) httpx.HTTPStatusError: Client error '409 Conflict' for url 'http://internal-a113cfb39e1794b17b427cf96671af99-319922687.ap-south-1.elb.amazonaws.com/api/v1/connections/sync/' For more information check: https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/409 The above exception was the direct cause of the following exception: Traceback (most recent call last): File "/mnt/appdata/prefect-proxy/.venv/lib/python3.10/site-packages/prefect/flow_engine.py", line 1520, in run_flow ret_val = run_flow_sync(**kwargs) File "/mnt/appdata/prefect-proxy/.venv/lib/python3.10/site-packages/prefect/flow_engine.py", line 1365, in run_flow_sync return engine.state if return_type == "state" else engine.result() File "/mnt/appdata/prefect-proxy/.venv/lib/python3.10/site-packages/prefect/flow_engine.py", line 340, in result raise self._raised File "/mnt/appdata/prefect-proxy/.venv/lib/python3.10/site-packages/prefect/flow_engine.py", line 755, in run_context yield self File "/mnt/appdata/prefect-proxy/.venv/lib/python3.10/site-packages/prefect/flow_engine.py", line 1363, in run_flow_sync engine.call_flow_fn() File "/mnt/appdata/prefect-proxy/.venv/lib/python3.10/site-packages/prefect/flow_engine.py", line 775, in call_flow_fn result = call_with_parameters(self.flow.fn, self.parameters) File "/mnt/appdata/prefect-proxy/.venv/lib/python3.10/site-packages/prefect/utilities/callables.py", line 208, in call_with_parameters return fn(*args, **kwargs) File "/tmp/tmpspk2d53rprefect/prefect-proxy-main/proxy/prefect_flows.py", line 316, in deployment_schedule_flow_v4 run_airbyte_connection_flow_v1(task_config) File "/mnt/appdata/prefect-proxy/.venv/lib/python3.10/site-packages/prefect/flows.py", line 1635, in call return run_flow( File "/mnt/appdata/prefect-proxy/.venv/lib/python3.10/site-packages/prefect/flow_engine.py", line 1520, in run_flow ret_val = run_flow_sync(**kwargs) File "/mnt/appdata/prefect-proxy/.venv/lib/python3.10/site-packages/prefect/flow_engine.py", line 1365, in run_flow_sync return engine.state if return_type == "state" else engine.result() File "/mnt/appdata/prefect-proxy/.venv/lib/python3.10/site-packages/prefect/flow_engine.py", line 335, in result _result = run_coro_as_sync(_result) File "/mnt/appdata/prefect-proxy/.venv/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 209, in run_coro_as_sync return call.result() File "/mnt/appdata/prefect-proxy/.venv/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 330, in result return self.future.result(timeout=timeout) File "/mnt/appdata/prefect-proxy/.venv/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 193, in result return self.__get_result() File "/home/ddp/.local/share/uv/python/cpython-3.10.12-linux-aarch64-gnu/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result raise self._exception File "/mnt/appdata/prefect-proxy/.venv/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 403, in _run_async result = await coro File "/mnt/appdata/prefect-proxy/.venv/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 190, in coroutine_wrapper return await task File "/mnt/appdata/prefect-proxy/.venv/lib/python3.10/site-packages/prefect/flow_engine.py", line 1389, in run_flow_async return engine.state if return_type == "state" else await engine.result() File "/mnt/appdata/prefect-proxy/.venv/lib/python3.10/site-packages/prefect/flow_engine.py", line 907, in result raise self._raised File "/mnt/appdata/prefect-proxy/.venv/lib/python3.10/site-packages/prefect/flow_engine.py", line 1325, in run_context yield self File "/mnt/appdata/prefect-proxy/.venv/lib/python3.10/site-packages/prefect/flow_engine.py", line 1387, in run_flow_async await engine.call_flow_fn() File "/mnt/appdata/prefect-proxy/.venv/lib/python3.10/site-packages/prefect/flow_engine.py", line 1339, in call_flow_fn result = await call_with_parameters(self.flow.fn, self.parameters) File "/mnt/appdata/prefect-proxy/.venv/lib/python3.10/site-packages/prefect_airbyte/flows.py", line 62, in run_connection_sync airbyte_sync = await task(airbyte_connection.trigger.aio)(airbyte_connection) File "/mnt/appdata/prefect-proxy/.venv/lib/python3.10/site-packages/prefect/task_engine.py", line 1416, in run_task_async return engine.state if return_type == "state" else await engine.result() File "/mnt/appdata/prefect-proxy/.venv/lib/python3.10/site-packages/prefect/task_engine.py", line 1016, in result raise self._raised File "/mnt/appdata/prefect-proxy/.venv/lib/python3.10/site-packages/prefect/task_engine.py", line 1337, in run_context yield self File "/mnt/appdata/prefect-proxy/.venv/lib/python3.10/site-packages/prefect/task_engine.py", line 1414, in run_task_async await engine.call_task_fn(txn) File "/mnt/appdata/prefect-proxy/.venv/lib/python3.10/site-packages/prefect/task_engine.py", line 1360, in call_task_fn result = await call_with_parameters(self.task.fn, parameters) File "/mnt/appdata/prefect-proxy/.venv/lib/python3.10/site-packages/prefect_airbyte/connections.py", line 388, in trigger ) = await airbyte_client.trigger_manual_sync_connection( File "/mnt/appdata/prefect-proxy/.venv/lib/python3.10/site-packages/prefect_airbyte/client.py", line 182, in trigger_manual_sync_connection raise err.AirbyteServerNotHealthyException() from e prefect_airbyte.exceptions.AirbyteServerNotHealthyException 044154 PM prefect.flow_runs Process 2268059 exited with status code: 1 044155 PM prefect.flow_runs.worker Reported flow run '8aeb6661-e0dd-4f75-8397-f463cab45ea6' as crashed: Flow run infrastructure exited with non-zero status code 1. 044155 PM
m
Your message was too long, here's your message at the allowed limit: we are using process workers And this is the failure logs of a run This is marked as Crashed where it looks like it should have been marked as failed Worker 'ProcessWorker 055ca9c8-141e-4ccd-bebd-37526875290b' submitting flow run '8aeb6661-e0dd-4f75-8397-f463cab45ea6' 04 39 51 PM prefect flow_runs worker Opening process 04 39 52 PM prefect flow_runs worker Completed submission of flow run '8aeb6661-e0dd-4f75-8397-f463cab45ea6' 04 39 52 PM prefect flow_runs worker Beginning flow run 'cream-hound' for flow 'deployment-schedule-flow-v4' 04 40 00 PM prefect flow_runs Encountered exception during execution AirbyteServerNotHealthyException() Traceback (most recent call last) File /mnt/appdata/prefect-proxy/ venv/lib/python3 10/site-packages/prefect_airbyte/client py line 170 in trigger_manual_sync_connection response raise_for_status() File /mnt/appdata/prefect-proxy/ venv/lib/python3 10/site-packages/httpx/_models py line 829 in raise_for_status raise HTTPStatusError(message request=request response=self) httpx HTTPStatusError Client error '409 Conflict' for url '<http //internal-a113cfb39e1794b17b427cf96671af99-319922687 ap-south-1 elb amazonaws com/api/v1/connections/sync/><http //internal-a113cfb39e1794b17b427cf96671af99-319922687 ap-south-1 elb amazonaws com/api/v1/connections/sync/'|'> For more information check <https //developer mozilla org/en-US/docs/Web/HTTP/Status/409> The above exception was the direct cause of the following exception Traceback (most recent call last) File /mnt/appdata/prefect-proxy/ venv/lib/python3 10/site-packages/prefect/flow_engine py line 755 in run_context yield self File /mnt/appdata/prefect-proxy/ venv/lib/python3 10/site-packages/prefect/flow_engine py line 1363 in run_flow_sync engine call_flow_fn() File /mnt/appdata/prefect-proxy/ venv/lib/python3 10/site-packages/prefect/flow_engine py line 775 in call_flow_fn result = call_with_parameters(self flow fn self parameters) File /mnt/appdata/prefect-proxy/ venv/lib/python3 10/site-packages/prefect/utilities/callables py line 208 in call_with_parameters return fn(*args *kwargs) File /tmp/tmpspk2d53rprefect/prefect-proxy-main/proxy/prefect_flows py line 316 in deployment_schedule_flow_v4 run_airbyte_connection_flow_v1(task_config) File /mnt/appdata/prefect-proxy/ venv/lib/python3 10/site-packages/prefect/flows py line 1635 in call return run_flow( File /mnt/appdata/prefect-proxy/ venv/lib/python3 10/site-packages/prefect/flow_engine py line 1520 in run_flow ret_val = run_flow_sync(*kwargs) File /mnt/appdata/prefect-proxy/ venv/lib/python3 10/site-packages/prefect/flow_engine py line 1365 in run_flow_sync return engine state if return_type == state else engine result() File /mnt/appdata/prefect-proxy/ venv/lib/python3 10/site-packages/prefect/flow_engine py line 335 in result _result = run_coro_as_sync(_result) File /mnt/appdata/prefect-proxy/ venv/lib/python3 10/site-packages/prefect/utilities/asyncutils py line 209 in run_coro_as_sync return call result() File /mnt/appdata/prefect-proxy/ venv/lib/python3 10/site-packages/prefect/_internal/concurrency/calls py line 330 in result return self future result(timeout=timeout) File /mnt/appdata/prefect-proxy/ venv/lib/python3 10/site-packages/prefect/_internal/concurrency/calls py line 193 in result return self fect/flow_engine py line 1339 in call_flow_fn result = await call_with_parameters(self flow fn self parameters) File /mnt/appdata/prefect-proxy/ venv/lib/python3 10/site-packages/prefect_airbyte/flows py line 62 in run_connection_sync airbyte_sync = await task(airbyte_connection trigger aio)(airbyte_connection) File /mnt/appdata/prefect-proxy/ venv/lib/python3 10/site-packages/prefect/task_engine py line 1416 in run_task_async return engine state if return_type == state else await engine result() File /mnt/appdata/prefect-proxy/ venv/lib/python3 10/site-packages/prefect/task_engine py line 1016 in result raise self _r
aised File /mnt/appdata/prefect-proxy/ venv/lib/python3 10/site-packages/prefect/task_engine py line 1337 in run_context yield self File /mnt/appdata/prefect-proxy/ venv/lib/python3 10/site-packages/prefect/task_engine py line 1414 in run_task_async await engine call_task_fn(txn) File /mnt/appdata/prefect-proxy/ venv/lib/python3 10/site-packages/prefect/task_engine py line 1360 in call_task_fn result = await call_with_parameters(self task fn parameters) File /mnt/appdata/prefect-proxy/ venv/lib/python3 10/site-packages/prefect_airbyte/connections py line 388 in trigger ) = await airbyte_client trigger_manual_sync_connection( File /mnt/appdata/prefect-proxy/ venv/lib/python3 10/site-packages/prefect_airbyte/client py line 182 in trigger_manual_sync_connection raise err AirbyteServerNotHealthyException() from e prefect_airbyte exceptions AirbyteServerNotHealthyException 04 41 53 PM prefect flow_runs Finished in state Failed('Flow run encountered an exception AirbyteServerNotHealthyException ') 04 41 54 PM prefect flow_runs Engine execution exited with unexpected exception Traceback (most recent call last) File /mnt/appdata/prefect-proxy/ venv/lib/python3 10/site-packages/prefect_airbyte/client py line 170 in trigger_manual_sync_connection response raise_for_status() File /mnt/appdata/prefect-proxy/ venv/lib/python3 10/site-packages/httpx/_models py line 829 in raise_for_status raise HTTPStatusError(message request=request response=self) httpx HTTPStatusError Client error '409 Conflict' for url
i
@Marvin i see a high number access share lock in prefect's database
m
thought for 323.1 seconds
Thanks — that’s a helpful clue. What “AccessShareLock” means - AccessShareLock is the lightweight read lock Postgres takes for SELECTs. It only conflicts with AccessExclusiveLock (DDL like ALTER TABLE) and does not block normal reads/writes. - Seeing many AccessShareLocks often just means you have lots of concurrent reads (e.g., flow/task/run queries, logs, schedules, automations). On its own, that’s usually normal. - However, if those reads are slow or piled up, they can starve the API, which can delay heartbeats and lead Prefect to mark runs as Crashed. First, check whether locks are actually causing waits Run these on your Prefect database to see if AccessShareLocks are long-lived or blocking: - Count lock types
Copy code
select l.mode, count(*)
from pg_locks l
join pg_database d on d.oid = l.database
where d.datname = current_database()
group by l.mode
order by count desc;
- Which queries are holding AccessShareLock the longest
Copy code
select a.pid,
       a.application_name,
       a.usename,
       a.state,
       a.query_start,
       now() - a.query_start as age,
       a.query
from pg_locks l
join pg_stat_activity a on l.pid = a.pid
where l.mode = 'AccessShareLock'
  and a.datname = current_database()
order by age desc
limit 50;
- See blocked vs blocking (if anything is actually waiting)
Copy code
select bl.pid as blocked_pid,
       ba.query as blocked_query,
       now() - ba.query_start as blocked_age,
       ka.pid as blocking_pid,
       ka.query as blocking_query,
       now() - ka.query_start as blocking_age,
       kl.mode as blocking_lock_mode
from pg_locks bl
join pg_stat_activity ba on ba.pid = bl.pid
join pg_locks kl on kl.locktype = bl.locktype
  and coalesce(kl.database,0) = coalesce(bl.database,0)
  and coalesce(kl.relation,0) = coalesce(bl.relation,0)
  and coalesce(kl.page,0) = coalesce(bl.page,0)
  and coalesce(kl.tuple,0) = coalesce(bl.tuple,0)
  and coalesce(kl.virtualxid,'') = coalesce(bl.virtualxid,'')
  and coalesce(kl.transactionid,0) = coalesce(bl.transactionid,0)
  and coalesce(kl.classid,0) = coalesce(bl.classid,0)
  and coalesce(kl.objid,0) = coalesce(bl.objid,0)
  and coalesce(kl.objsubid,0) = coalesce(bl.objsubid,0)
  and kl.pid != bl.pid
join pg_stat_activity ka on ka.pid = kl.pid
where not bl.granted;
If you can paste a few rows from these, I can help pinpoint which Prefect endpoints/queries are hottest. Why Prefect might be generating many reads - Flow/task run listing, details pages, and the UI auto-refresh - Worker polling for runs and heartbeat updates - Log ingestion and log reads - Automations/concurrency checks and schedules Docs that may help - Self-hosting Prefect (deployment + DB notes) - Database configuration API (pool size, timeouts, etc.) - Prefect blog: Postgres partitioning Mitigations and tuning - Verify it’s not DDL: Avoid running migrations/DDL (which need AccessExclusiveLock) during peak traffic — that will queue behind all AccessShareLocks and cause blocking. - Shorten long reads: - Ensure autovacuum is healthy to avoid bloat that makes SELECTs slow. - Keep the DB on sufficient hardware (CPU/IO), and watch for high sequential scans or slow index usage. - If you have very large logs/events history, consider archiving/partitioning to keep hot tables small. - Prefect server DB pool limits: - Prefect’s async Postgres config supports pool and timeout options like
sqlalchemy_pool_size
,
sqlalchemy_max_overflow
, and timeouts. - If you’ve scaled API replicas up, make sure the aggregate connections aren’t overwhelming Postgres; capping pool size or placing PgBouncer (transaction pooling) in front can help smooth spikes.
- Reference: AsyncPostgresConfiguration parameters - Guardrails: - Consider a Postgres
statement_timeout
for read queries to prevent runaway scans. - Reduce log write/read pressure if possible (lower log level, batch/forward fewer logs to API) — see logging handlers: Logging to API - Correlate with “Crashed” timing: - If API latency/DB waits spike around the times your runs “crash”, that’s likely heartbeats timing out due to backpressure. Reducing contention should reduce Crashed classifications. A few clarifying questions to tailor the next steps - Are you self-hosting Prefect Server with Postgres? What Postgres version and instance size? - Roughly how many API replicas do you run, and do you have PgBouncer in front? - Do you see blocked queries (not just many AccessShareLocks), or API 5xx/slow responses in server logs? - About how large are your
flow_runs
,
task_runs
, and
logs
tables? If you can share the outputs of the SQL above plus your API/DB topology, I’ll suggest concrete tuning (pool sizes, PgBouncer settings, and any Prefect-side configuration) to bring lock pressure down and stabilize your runs.
i
@Marvin i am seeing stuff like this 9|prefect- | sqlalchemy.exc.DBAPIError: (sqlalchemy.dialects.postgresql.asyncpg.Error) <class 'asyncpg.exceptions.DeadlockDetectedError'>: deadlock detected 9|prefect- | DETAIL: Process 6593 waits for ShareLock on transaction 105382407; blocked by process 6972. 9|prefect- | Process 6972 waits for ShareLock on transaction 105382406; blocked by process 6593. 9|prefect- | HINT: See server log for query details. 9|prefect- | [SQL: UPDATE deployment SET last_polled=$1::TIMESTAMP WITH TIME ZONE, status=$2::deployment_status, updated=now() WHERE deployment.id IN (NULL) AND (1 != 1) OR deployment.work_queue_id IN ($3::UUID)] could this cause the runs to crash ?
m
thought for 69.0 seconds
Short answer: yes — repeated DB deadlocks like the one you pasted can absolutely cascade into “Crashed” runs. What’s happening - Your DB is reporting a Postgres deadlock during an UPDATE on deployments: - UPDATE deployment SET last_polled = ..., status = ... WHERE ... work_queue_id IN ($3::UUID) - This UPDATE is part of the polling path that marks deployments/queues as polled. If two processes hit overlapping rows in different orders, Postgres can deadlock and abort one transaction. - When the API process handling a heartbeat/state update/polling request gets a 40P01 (deadlock detected), SQLAlchemy rolls back. If this happens frequently or causes slowdowns/timeouts, workers can’t send heartbeats or record terminal states reliably — runs may then be marked Crashed instead of Failed. Why you’re seeing deadlocks - Multiple workers polling the same work queue (or many API replicas/schedulers) can issue concurrent updates to the same deployment rows. - High concurrency increases the odds of overlapping write sets and deadlocks. What to do next - Ensure one worker per work queue or shard deployments across multiple queues so each worker polls a distinct queue. This alone typically eliminates these UPDATE conflicts. - If you’ve scaled API/server replicas, ensure you’re not running multiple instances of services that aggressively update the same rows at the same time. - Upgrade Prefect. You’re on 3.1.15; there have been many stability and orchestration improvements since then. Try the latest 3.x to reduce DB contention and improve error handling. Release notes: Prefect releases - Postgres observability: - Enable log_lock_waits and set a low deadlock_timeout to capture details in the DB logs. - Confirm who/what is colliding by checking pg_stat_activity when a deadlock occurs; look for multiple sessions updating the same deployments/work_queue_id. - General DB hygiene: - Make sure autovacuum is keeping tables and indexes healthy; slow scans keep locks open longer. - If you’re self-hosting with high traffic, consider putting PgBouncer in front to cap connection spikes (doesn’t fix deadlocks, but reduces overall pressure). - Temporary resilience: - Add an Automation to rerun flow runs that enter Crashed while you reduce contention. Context: Detecting zombie/crashed flows - Server DB config knobs (if you need to tune pool/timeout): - Prefect’s async Postgres config supports pool and timeout params (sqlalchemy_pool_size, sqlalchemy_max_overflow, timeout, connection_timeout). Reference: Database configuration API Quick confirmation steps - Are multiple workers polling the same work queue? If yes, try running a single worker per queue (or split into multiple queues) and see if deadlocks disappear. - Do the timestamps for these deadlocks line up with Crashed runs or missed heartbeats? - Are you running multiple API replicas with services enabled on each? If so, let's verify which services should be singleton vs replicated for your setup. If you can share: - How many workers and how many work queues (and whether multiple workers share a queue) - Whether your Prefect API is scaled horizontally - A snippet from pg_stat_activity or the DB logs during a deadlock …I can suggest a concrete, least-effort change to stop the deadlocks and stabilize your runs.
i
@Marvin i have 4 workers listening to the same queue. And then another queue with 3 workers listening to another queue.
m
thought for 115.2 seconds
Thanks — that configuration explains the deadlocks. Yes: having multiple workers polling the same work queue will issue concurrent updates against the same deployment rows (setting last_polled/status). Under load, two transactions can lock those rows in different orders and Postgres will raise “deadlock detected” — exactly what you’re seeing. When this repeats, it can slow/abort API operations (heartbeats, state writes), which shows up as Crashed runs. What I recommend - Short-term, prove it: drop to 1 worker per queue - Run a single worker on each of your two queues. You should see the deadlocks stop and flow runs stabilize (Crashed → normal). - Scale by sharding, not by stacking workers on a queue - If you need more throughput, create additional work queues and assign deployments to them (e.g., four queues with one worker each), instead of 4 workers on one queue. - CLI for queues:
Copy code
prefect work-queue create <name>
prefect work-queue ls
- Start workers pointing at distinct queues:
Copy code
prefect worker start -p <pool> -q <queue-A>
prefect worker start -p <pool> -q <queue-B>
...
Reference: Work pools and queues - Stagger polling if you must share a queue - If you can’t shard right now, stagger worker polling to reduce simultaneous row updates. Set slightly different query intervals per worker:
Copy code
# worker 1
export PREFECT_WORKER_QUERY_SECONDS=7
prefect worker start -p <pool> -q <queue>

# worker 2
export PREFECT_WORKER_QUERY_SECONDS=11
prefect worker start -p <pool> -q <queue>

# worker 3
export PREFECT_WORKER_QUERY_SECONDS=13
prefect worker start -p <pool> -q <queue>

# worker 4
export PREFECT_WORKER_QUERY_SECONDS=17
prefect worker start -p <pool> -q <queue>
Worker settings reference (heartbeat/query/prefetch): Settings reference - Upgrade Prefect to the latest 3.x - You’re on 3.1.15; there have been many orchestration and DB-path improvements since. Upgrading can reduce contention and improve error handling. - Releases: Prefect releases - DB observability and guardrails - Enable Postgres
log_lock_waits = on
and set a low
deadlock_timeout
to capture detail. - Ensure autovacuum is keeping indexes/tables tidy so transactions are short. - Keep an eye on API latency around deadlock times; if heartbeats slow, Crashed rates will rise. If you can try “1 worker per queue” for 15–30 minutes and confirm deadlocks drop to zero, we can plan a clean rollout: shard queues or stagger poll intervals to scale safely.
i
@Marvin what are the best practices here ? I want to have concurrency of 4 for a particular type of flows. Curretnly, i am sending them to a single queue and then hoping multiple workers pick up different work & execute concurrently. Please correct me if i got anything wrong.
m
thought for 188.4 seconds
Great question — you’re close, just need a slight shift in approach to get safe concurrency without the deadlocks you’re seeing. Best-practice patterns to get concurrency of 4 for a “type” of flows Option A (simplest) — one queue, one worker, set queue concurrency to 4 - Put all “type-X” deployments on a dedicated work queue. - Run a single worker that polls that queue. - Set the work queue’s concurrency limit to 4 — Prefect will ensure only 4 active runs from that queue at a time. A single worker can launch multiple runs in parallel; you do not need multiple workers on the same queue to achieve concurrency. Commands:
Copy code
# create or confirm the queue
prefect work-queue create type-x

# cap concurrency for that queue at 4
prefect work-queue set-concurrency-limit type-x 4

# start a single worker for that queue
prefect worker start -p <your-pool> -q type-x
Docs: - Work queue CLI - Work pools and queues Option B — shard into multiple queues, one worker per queue - Create 4 queues (type-x-1..4), run one worker per queue. - Assign deployments across those queues. If you want an exact total of 4 concurrent runs, set each queue’s limit to 1. - This avoids multiple workers hitting the same deployment rows and eliminates the deadlock you saw. Commands:
Copy code
prefect work-queue create type-x-1
prefect work-queue create type-x-2
prefect work-queue create type-x-3
prefect work-queue create type-x-4

prefect work-queue set-concurrency-limit type-x-1 1
prefect work-queue set-concurrency-limit type-x-2 1
prefect work-queue set-concurrency-limit type-x-3 1
prefect work-queue set-concurrency-limit type-x-4 1

# one worker per queue
prefect worker start -p <your-pool> -q type-x-1
prefect worker start -p <your-pool> -q type-x-2
prefect worker start -p <your-pool> -q type-x-3
prefect worker start -p <your-pool> -q type-x-4
Option C — global concurrency limit across many queues - If “type-X” flows are spread across multiple queues/pools, use a Global Concurrency Limit (GCL) with a shared name/tag for those flows to cap them at 4 in total.
Copy code
prefect gcl create type-x-cap --limit 4
Docs: Global concurrency limits What to avoid (and why) - Multiple workers polling the same queue. This can cause concurrent UPDATEs to the same deployment/work-queue rows (e.g., last_polled/status) and lead to Postgres deadlocks. You’ve already hit this. - If you must temporarily share a queue, stagger poll intervals to reduce collisions:
Copy code
# different workers with different intervals
export PREFECT_WORKER_QUERY_SECONDS=7;  prefect worker start -p <pool> -q <queue>
export PREFECT_WORKER_QUERY_SECONDS=11; prefect worker start -p <pool> -q <queue>
export PREFECT_WORKER_QUERY_SECONDS=13; prefect worker start -p <pool> -q <queue>
export PREFECT_WORKER_QUERY_SECONDS=17; prefect worker start -p <pool> -q <queue>
Settings reference: Settings reference Recommendation for your setup right now - Move to Option A: keep a single worker per queue and set a work-queue concurrency limit to 4. This will give you the parallelism you want and eliminate the deadlocks that can cascade into Crashed runs. - Or use Option B if you prefer multiple workers: shard into multiple queues with one worker per queue. Also consider upgrading from 3.1.15 to the latest 3.x — there have been stability improvements in orchestration/DB paths. Releases: Prefect releases If you share how your “type-X” deployments are assigned today, I’m happy to suggest an exact queue layout and limits to get you to 4 concurrent safely.
i
@Marvin will the first approach of a single worker be able to handle concurrency where flow runs run for a longer time ?
m
thought for 63.4 seconds
Yes. A single worker can sustain many concurrent, long-running flow runs. How it works - The worker’s job is to poll the queue, submit flow runs to your chosen infrastructure, and keep heartbeating. It does not “block” on a run finishing before it can submit more. - Concurrency comes from the infrastructure: - Process worker: starts multiple Python processes on the host; each run is its own process. - Docker worker: creates multiple containers; each run is its own container. - Kubernetes worker: creates multiple pods; each run is its own pod. - As long as your host/cluster has capacity, one worker can keep launching new runs up to your limits. What determines “how many at once” - Work-queue concurrency limit: caps how many runs from that queue are allowed to be active at a time. This is the simplest way to guarantee “4 at once”. - Infrastructure capacity: CPU/memory available to spawn processes/containers/pods. - Worker polling settings:
PREFECT_WORKER_QUERY_SECONDS
,
PREFECT_WORKER_PREFETCH_SECONDS
,
PREFECT_WORKER_HEARTBEAT_SECONDS
control how often the worker fetches work and reports health. Defaults are fine for most, but you can increase
PREFECT_WORKER_PREFETCH_SECONDS
slightly if you want it to grab more work ahead of time. Good defaults - Use one worker per queue with a queue concurrency limit of 4. That worker will steadily launch up to 4 flow runs and maintain them even if they each run for hours. - Ensure your infra has capacity: for process workers, enough CPU/RAM; for Docker/K8s, check container/pod quotas and per-run resource requests/limits. When to use more than one worker - High availability across nodes or AZs - Separate infra types (e.g., GPU vs CPU) - Very high scheduling throughput (many short runs per minute) But avoid multiple workers on the same queue unless you shard queues or stagger polling; otherwise you can reintroduce the deadlocks we saw. If you share which worker type you’re using (process/docker/k8s) and typical run runtime/resource needs, I can suggest concrete resource settings to comfortably sustain 4 long-running runs.
i
@Marvin do you understand whats happening here ? I keep running into timeout errors Encountered exception during execution: ReadTimeout('') Traceback (most recent call last): File "/mnt/appdata/prefect-proxy/.venv/lib/python3.10/site-packages/httpx/_transports/default.py", line 101, in map_httpcore_exceptions yield File "/mnt/appdata/prefect-proxy/.venv/lib/python3.10/site-packages/httpx/_transports/default.py", line 394, in handle_async_request resp = await self._pool.handle_async_request(req) File "/mnt/appdata/prefect-proxy/.venv/lib/python3.10/site-packages/httpcore/_async/connection_pool.py", line 256, in handle_async_request raise exc from None File "/mnt/appdata/prefect-proxy/.venv/lib/python3.10/site-packages/httpcore/_async/connection_pool.py", line 236, in handle_async_request response = await connection.handle_async_request( File "/mnt/appdata/prefect-proxy/.venv/lib/python3.10/site-packages/httpcore/_async/connection.py", line 103, in handle_async_request return await self._connection.handle_async_request(request) File "/mnt/appdata/prefect-proxy/.venv/lib/python3.10/site-packages/httpcore/_async/http11.py", line 136, in handle_async_request raise exc File "/mnt/appdata/prefect-proxy/.venv/lib/python3.10/site-packages/httpcore/_async/http11.py", line 106, in handle_async_request ) = await self._receive_response_headers(**kwargs) File "/mnt/appdata/prefect-proxy/.venv/lib/python3.10/site-packages/httpcore/_async/http11.py", line 177, in _receive_response_headers event = await self._receive_event(timeout=timeout) File "/mnt/appdata/prefect-proxy/.venv/lib/python3.10/site-packages/httpcore/_async/http11.py", line 217, in _receive_event data = await self._network_stream.read( File "/mnt/appdata/prefect-proxy/.venv/lib/python3.10/site-packages/httpcore/_backends/anyio.py", line 32, in read with map_exceptions(exc_map): File "/home/ddp/.local/share/uv/python/cpython-3.10.12-linux-aarch64-gnu/lib/python3.10/contextlib.py", line 153, in exit self.gen.throw(typ, value, traceback) File "/mnt/appdata/prefect-proxy/.venv/lib/python3.10/site-packages/httpcore/_exceptions.py", line 14, in map_exceptions raise to_exc(exc) from exc httpcore.ReadTimeout The above exception was the direct cause of the following exception: Traceback (most recent call last): File "/mnt/appdata/prefect-proxy/.venv/lib/python3.10/site-packages/prefect/flow_engine.py", line 755, in run_context yield self File "/mnt/appdata/prefect-proxy/.venv/lib/python3.10/site-packages/prefect/flow_engine.py", line 1363, in run_flow_sync engine.call_flow_fn() File "/mnt/appdata/prefect-proxy/.venv/lib/python3.10/site-packages/prefect/flow_engine.py", line 775, in call_flow_fn result = call_with_parameters(self.flow.fn, self.parameters) File "/mnt/appdata/prefect-proxy/.venv/lib/python3.10/site-packages/prefect/utilities/callables.py", line 208, in call_with_parameters return fn(*args, **kwargs) File "/tmp/tmp5usotnrvprefect/prefect-proxy-main/proxy/prefect_flows.py", line 316, in deployment_schedule_flow_v4 run_airbyte_connection_flow_v1(task_config) File "/mnt/appdata/prefect-proxy/.venv/lib/python3.10/site-packages/prefect/flows.py", line 1635, in call return run_flow( File "/mnt/appdata/prefect-proxy/.venv/lib/python3.10/site-packages/prefect/flow_engine.py", line 1520, in run_flow ret_val = run_flow_sync(**kwargs) File "/mnt/appdata/prefect-proxy/.venv/lib/python3.10/site-packages/prefect/flow_engine.py", line 1365, in run_flow_sync return engine.state if return_type == "state" else engine.result() File "/mnt/appdata/prefect-proxy/.venv/lib/python3.10/site-packages/prefect/flow_engine.py", line 335, in result _result = run_coro_as_sync(_result) File "/mnt/appdata/prefect-proxy/.venv/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 209, in run_coro_as_sync return call.result() File "/mnt/appdata/prefect-proxy/.venv/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 330, in result return self.future.result(timeout=timeout) File "/mnt/appdata/prefect-proxy/.venv/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 193, in result return self.__get_result() File "/home/ddp/.local/share/uv/python/cpython-3.10.12-linux-aarch64-gnu/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result raise self._exception File "/mnt/appdata/prefect-proxy/.venv/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 403, in _run_async result = await coro File "/mnt/appdata/prefect-proxy/.venv/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 190, in coroutine_wrapper return await task File "/mnt/appdata/prefect-proxy/.venv/lib/python3.10/site-packages/prefect/flow_engine.py", line 1389, in run_flow_async return engine.state if return_type == "state" else await engine.result() File "/mnt/appdata/prefect-proxy/.venv/lib/python3.10/site-packages/prefect/flow_engine.py", line 907, in result raise self._raised File "/mnt/appdata/prefect-proxy/.venv/lib/python3.10/site-packages/prefect/flow_engine.py", line 1325, in run_context yield self File "/mnt/appdata/prefect-proxy/.venv/lib/python3.10/site-packages/prefect/flow_engine.py", line 1387, in run_flow_async await engine.call_flow_fn() File "/mnt/appdata/prefect-proxy/.venv/lib/python3.10/site-packages/prefect/flow_engine.py", line 1339, in call_flow_fn result = await call_with_parameters(self.flow.fn, self.parameters) File "/mnt/appdata/prefect-proxy/.venv/lib/python3.10/site-packages/prefect_airbyte/flows.py", line 62, in run_connection_sync airbyte_sync = await task(airbyte_connection.trigger.aio)(airbyte_connection) File "/mnt/appdata/prefect-proxy/.venv/lib/python3.10/site-packages/prefect/task_engine.py", line 1416, in run_task_async return engine.state if return_type == "state" else await engine.result() File "/mnt/appdata/prefect-proxy/.venv/lib/python3.10/site-packages/prefect/task_engine.py", line 1016, in result raise self._raised File "/mnt/appdata/prefect-proxy/.venv/lib/python3.10/site-packages/prefect/task_engine.py", line 1337, in run_context yield self File "/mnt/appdata/prefect-proxy/.venv/lib/python3.10/site-packages/prefect/task_engine.py", line 1414, in run_task_async await engine.call_task_fn(txn) File "/mnt/appdata/prefect-proxy/.venv/lib/python3.10/site-packages/prefect/task_engine.py", line 1360, in call_task_fn result = await call_with_parameters(self.task.fn, parameters) File "/mnt/appdata/prefect-proxy/.venv/lib/python3.10/site-packages/prefect_airbyte/connections.py", line 388, in trigger ) = await airbyte_client.trigger_manual_sync_connection( File "/mnt/appdata/prefect-proxy/.venv/lib/python3.10/site-packages/prefect_airbyte/client.py", line 167, in trigger_manual_sync_connection response = await self._client.post( File "/mnt/appdata/prefect-proxy/.venv/lib/python3.10/site-packages/httpx/_client.py", line 1859, in post return await self.request( File "/mnt/appdata/prefect-proxy/.venv/lib/python3.10/site-packages/httpx/_client.py", line 1540, in request return await self.send(request, auth=auth, follow_redirects=follow_redirects) File "/mnt/appdata/prefect-proxy/.venv/lib/python3.10/site-packages/httpx/_client.py", line 1629, in send response = await self._send_handling_auth( File "/mnt/appdata/prefect-proxy/.venv/lib/python3.10/site-packages/httpx/_client.py", line 1657, in _send_handling_auth response = await self._send_handling_redirects( File "/mnt/appdata/prefect-proxy/.venv/lib/python3.10/site-packages/httpx/_client.py", line 1694, in _send_handling_redirects response = await self._send_single_request(request) File "/mnt/appdata/prefect-proxy/.venv/lib/python3.10/site-packages/httpx/_client.py", line 1730, in _send_single_request response = await transport.handle_async_request(request) File "/mnt/appdata/prefect-proxy/.venv/lib/python3.10/site-packages/httpx/_transports/default.py", line 393, in handle_async_request with map_httpcore_exceptions(): File "/home/ddp/.local/share/uv/python/cpython-3.10.12-linux-aarch64-gnu/lib/python3.10/contextlib.py", line 153, in exit self.gen.throw(typ, value, traceback) File "/mnt/appdata/prefect-proxy/.venv/lib/python3.10/site-packages/httpx/_transports/default.py", line 118, in map_httpcore_exceptions raise mapped_exc(message) from exc httpx.ReadTimeout 021144 AM prefect.flow_runs Finished in state Failed('Flow run encountered an exception: ReadTimeout: ') 021145 AM prefect.flow_runs Engine execution exited with unexpected exception Traceback (most recent call last): File "/mnt/appdata/prefect-proxy/.venv/lib/python3.10/site-packages/httpx/_transports/default.py", line 101, in map_httpcore_exceptions yield File "/mnt/appdata/prefect-proxy/.venv/lib/python3.10/site-packages/httpx/_transports/default.py", line 394, in handle_async_request resp = await self._pool.handle_async_request(req) File "/mnt/appdata/prefect-proxy/.venv/lib/python3.10/site-packages/httpcore/_async/connection_pool.py", line 256, in handle_async_request raise exc from None File "/mnt/appdata/prefect-proxy/.venv/lib/python3.10/site-packages/httpcore/_async/connection_pool.py", line 236, in handle_async_request response = await connection.handle_async_request( File "/mnt/appdata/prefect-proxy/.venv/lib/python3.10/site-packages/httpcore/_async/connection.py", line 103, in handle_async_request return await self._connection.handle_async_request(request) File "/mnt/appdata/prefect-proxy/.venv/lib/python3.10/site-packages/httpcore/_async/http11.py", line 136, in handle_async_request raise exc File "/mnt/appdata/prefect-proxy/.venv/lib/python3.10/site-packages/httpcore/_async/http11.py", line 106, in handle_async_request ) = await self._receive_response_headers(**kwargs) File "/mnt/appdata/prefect-proxy/.venv/lib/python3.10/site-packages/httpcore/_async/http11.py", line 177, in _receive_response_headers event = await self._receive_event(timeout=timeout) File "/mnt/appdata/prefect-proxy/.venv/lib/python3.10/site-packages/httpcore/_async/http11.py", line 217, in _receive_event data = await self._network_stream.read( File "/mnt/appdata/prefect-proxy/.venv/lib/python3.10/site-packages/httpcore/_backends/anyio.py", line 32, in read with map_exceptions(exc_map): File "/home/ddp/.local/share/uv/python/cpython-3.10.12-linux-aarch64-gnu/lib/python3.10/contextlib.py", line 153, in exit self.gen.throw(typ, value, traceback) File "/mnt/appdata/prefect-proxy/.venv/lib/python3.10/site-packages/httpcore/_exceptions.py", line 14, in map_exceptions raise to_exc(exc) from exc httpcore.ReadTimeout The above exception was the direct cause of the following exception:
m
Your message was too long, here's your message at the allowed limit: do you understand whats happening here ? I keep running into timeout errors Encountered exception during execution ReadTimeout('') Traceback (most recent call last) File /mnt/appdata/prefect-proxy/ venv/lib/python3 10/site-packages/httpx/_transports/default py line 101 in map_httpcore_exceptions yield File /mnt/appdata/prefect-proxy/ venv/lib/python3 10/site-packages/httpx/_transports/default py line 394 in handle_async_request resp = await self _pool handle_async_request(req) File /mnt/appdata/prefect-proxy/ venv/lib/python3 10/site-packages/httpcore/_async/connection_pool py line 256 in handle_async_request raise exc from None File /mnt/appdata/prefect-proxy/ venv/lib/python3 10/site-packages/httpcore/_async/connection_pool py line 236 in handle_async_request response = await connection handle_async_request( File /mnt/appdata/prefect-proxy/ venv/lib/python3 10/site-packages/httpcore/_async/connection py line 103 in handle_async_request return await self _connection handle_async_request(request) File /mnt/appdata/prefect-proxy/ venv/lib/python3 10/site-packages/httpcore/_async/http11 py line 136 in handle_async_request raise exc File /mnt/appdata/prefect-proxy/ venv/lib/python3 10/site-packages/httpcore/_async/http11 py line 106 in handle_async_request ) = await self _receive_response_headers(*kwargs) File /mnt/appdata/prefect-proxy/ venv/lib/python3 10/site-packages/httpcore/_async/http11 py line 177 in _receive_response_headers event = await self _receive_event(timeout=timeout) File /mnt/appdata/prefect-proxy/ venv/lib/python3 10/site-packages/httpcore/_async/http11 py line 217 in _receive_event data = await self _network_stream read( File /mnt/appdata/prefect-proxy/ venv/lib/python3 10/site-packages/httpcore/_backends/anyio py line 32 in read with map_exceptions(exc_map) File /home/ddp/ local/share/uv/python/cpython-3 10 12-linux-aarch64-gnu/lib/python3 10/contextlib py line 153 in exit self gen throw(typ value traceback) File /mnt/appdata/prefect-proxy/ venv/lib/python3 10/site-packages/httpcore/_exceptions py line 14 in map_exceptions raise to_exc(exc) from exc httpcore ReadTimeout The above exception was the direct cause of the following exception Traceback (most recent call last) File /mnt/appdata/prefect-proxy/ venv/lib/python3 10/site-packages/prefect/flow_engine py line 755 in run_context yield self File /mnt/appdata/prefect-proxy/ venv/lib/python3 10/site-packages/prefect/flow_engine py line 1363 in run_flow_sync engine call_flow_fn() File /mnt/appdata/prefect-proxy/ venv/lib/python3 10/site-packages/prefect/flow_engine py line 775 in call_flow_fn result = call_with_parameters(self flow fn self parameters) File /mnt/appdata/prefect-proxy/ venv/lib/python3 10/site-packages/prefect/utilities/callables py line 208 in call_with_parameters return fn(*args *kwargs) File /tmp/tmp5usotnrvprefect/prefect-proxy-main/proxy/prefect_flows py line 316 in deployment_schedule_flow_v4 run_airbyte_connection_flow_v1(task_config) File /mnt/appdata/prefect-proxy/ venv/lib/python3 10/site-packages/prefect/flows py line 1635 in call return run_flow( File /mnt/appdata/prefect-proxy/ venv/lib/python3 10/site-packages/prefect/flow_engine py line 1520 in run_flow ret_val = run_flow_sync(**kwargs) File /mnt/appdata/prefect-proxy/ venv/lib/python3 10/site-packages/prefect/flow_engine py line 1365 in run_flow_sync return engine state if return_type == state else engine result() File /mnt/appdata/prefect-proxy/ venv/lib/python3 10/site-packages/prefect/flow_engine py line 335 in result _result = run_coro_as_sync(_result) File /mnt/appdata/prefect-proxy/ venv/lib/python3 10/site-packages/prefect/utilities/asyncutils py line 209 in run_coro_as_sync return call result() File /mnt/appdata/prefect-proxy/ venv/lib/python3 10/site-packages/prefect/_internal/concurrency/calls py line 330 in result return self future result(timeout=timeout) File /mnt/appdata/prefect-proxy/ venv
/lib/python3 10/site-packages/prefect/_internal/concurrency/calls py line 193 in result return self __get_result() File /home/ddp/ local/share/uv/python/cpython-3 10 12-linux-aarch64-gnu/lib/python3 10/concurrent/futures/_base py line 403 in __get_result raise self _exception File /mnt/appdata/prefect-proxy/ venv/lib/python3 10/site-packages/prefect/_internal/concurrency/calls py line 403 in _run_async result = await coro File /mnt/appdata/prefect-proxy/ venv/lib/python3 10/site-packages/prefect/utilities/asyncutils py line 190 in coroutine_wrapper return await task File /mnt/appdata/prefect-proxy/ venv/lib/python3 10/site-packages/prefect/flow_engine py line 1389 in run_flow_async return engine state if return_type == state else await engine result() File /mnt/appdata/prefect-proxy/ venv/lib/python3 10/site-packages/prefect/flow_engine py line 907 in result raise self _raised File /mnt/appdata/prefect-proxy/ venv/lib/python3 10/site-packages/prefect/flow_engine py line 1325 in run_context yield self File /mnt/appdata/prefect-proxy/ venv/lib/python3 10/site-packages/prefect/flow_engine py line 1387 in run_flow_async await engine call_flow_fn() File /mnt/appdata/prefect-proxy/ venv/lib/python3 10/site-packages/prefect/flow_engine py line 1339 in call_flow_fn result = await call_with_parameters(self flow fn self parameters) File /mnt/appdata/prefect-proxy/ venv/lib/python3 10/site-packages/prefect_airbyte/flows py line 62 in run_connection_sync airbyte_sync = await task(airbyte_connection trigger aio)(airbyte_connection) File /mnt/appdata/prefect-proxy/ venv/lib/python3 10/site-packages/prefect/task_engine py line 1416 in run_task_async return engine state if return_type == state else await engine result() File /mnt/appdata/prefect-proxy/ venv/lib/python3 10/site-packages/prefect/task_engine
i
@Marvin i see some httpx timeout errors in the run. We have mutliple subflows and it timeouts at a some point in the middle. Also why is this reported as crashed is something i am not able to understand.
m
thought for 364.6 seconds
Thanks for flagging this — two things are likely happening here: 1) httpx timeout errors mid-run - Subflows trigger many API calls (create/run subflow, send heartbeats, send logs, set states). If the API call takes longer than the client’s request timeout or the network stalls, httpx will raise a timeout. - By default in Prefect 3.x, the API request timeout is 60s. Bursty log uploads, many concurrent subflows, slow proxies/load balancers, or periodic network hiccups can push calls over that limit. - If you’re launching many subflows concurrently, you may hit transient API latency or connection limits and see timeouts sporadically “in the middle.” Tuning suggestions - Increase the API request timeout and client retries:
Copy code
# give calls more time (e.g. 180–300s)
export PREFECT_API_REQUEST_TIMEOUT=180
# add more resilience to transient issues
export PREFECT_CLIENT_MAX_RETRIES=8
# optionally add extra retryable status codes
export PREFECT_CLIENT_RETRY_EXTRA_CODES=429,502,503,504
- Optional: enable HTTP/2 to multiplex requests if your network allows it:
Copy code
export PREFECT_API_ENABLE_HTTP2=true
- Reduce API chatter from logs to rule out log uploads as the cause:
Copy code
# temporarily disable log shipping to the API
export PREFECT_LOGGING_TO_API_ENABLED=false
# or increase log batch interval/size to send fewer, larger requests
export PREFECT_LOGGING_TO_API_BATCH_INTERVAL=5
- Add retries on your flow/subflow:
Copy code
from prefect import flow

@flow(retries=2, retry_delay_seconds=30)
def parent():
    # For a subflow, you can also apply options:
    result = child.with_options(retries=2, retry_delay_seconds=15)()
- If you’re fanning out many subflows at once, consider staggering or batching them to lower API pressure. 2) Why the run is reported as Crashed “Crashed” means the backend stopped receiving heartbeats from the process running your flow (or the process terminated unexpectedly). Common causes: - The process/pod/container was killed (OOM, eviction, preemption, host restart). - The runner stopped sending heartbeats due to a long blocking call or heavy CPU/GIL contention. - A long network partition prevented heartbeats and state updates. - Severe logging or network backpressure stalled the event loop long enough to miss heartbeats. What you can tune/check - Ensure heartbeats are frequent enough (min allowed is 30s):
Copy code
export PREFECT_RUNNER_HEARTBEAT_FREQUENCY=30
- Check for infra-level terminations or OOM in your worker logs, container logs, or Kubernetes events. - If you have very CPU-bound sections, consider yielding control periodically or moving the heavy work out of the flow function to tasks/subprocesses so the runner can keep sending heartbeats. Diagnostics to help pinpoint - Share a short snippet of the exact httpx timeout error text (connect vs read vs write vs pool timeout is very telling). - Are you on Prefect Cloud or self-hosted Prefect Server? If Server, is there a reverse proxy with aggressive timeouts (e.g., 60s idle/read timeout)? That can cause mid-run timeouts. - Which worker and infrastructure type are you using (process, Docker, Kubernetes)? Any signs of OOM/eviction? - Roughly how many subflows are launched concurrently when this happens? - Check the run timeline for “missed heartbeat” windows: Detect zombie flows (crash detection) - If on Cloud, this guide can help spot environment issues: Troubleshoot Prefect Cloud - Confirm effective settings with:
Copy code
prefect profile inspect
If you can share: - A snippet of the httpx stack trace (connect/read/write/total timeout), - Where in the subflow fan-out it occurs, - Your environment (Cloud vs Server, worker type, k8s/pod details, any proxy), …I can recommend more targeted changes.
i
@Marvin This is the snippet of the error that i see File "/home/ddp/.local/share/uv/python/cpython-3.10.12-linux-aarch64-gnu/lib/python3.10/contextlib.py", line 153, in exit self.gen.throw(typ, value, traceback) File "/mnt/appdata/prefect-proxy/.venv/lib/python3.10/site-packages/httpx/_transports/default.py", line 118, in map_httpcore_exceptions raise mapped_exc(message) from exc httpx.ReadTimeout
m
thought for 49.1 seconds
Thanks for sharing the stack snippet —
httpx.ReadTimeout
specifically means the request connected and started waiting for the response body, but the read didn’t complete within the client’s read timeout. In Prefect 3.x the default API request timeout is 60s, so long-running/slow API responses (or a proxy/load-balancer with ~60s read/idle timeouts) can trigger this “in the middle” of a run, especially with many subflows. Why it may be reported as Crashed - Prefect marks a run as Crashed when heartbeats stop (e.g., the process is blocked/killed or can’t reach the API long enough). A long, blocking network call that repeatedly times out can cause missed heartbeats, so the backend assumes the process died. See Detect zombie flows. Quick mitigations to try - Increase API request timeout (try 180–300s) and client retries:
Copy code
export PREFECT_API_REQUEST_TIMEOUT=180
export PREFECT_CLIENT_MAX_RETRIES=8
export PREFECT_CLIENT_RETRY_EXTRA_CODES=429,502,503,504
- Optional: enable HTTP/2 if your network supports it (can improve multiplexing under load):
Copy code
export PREFECT_API_ENABLE_HTTP2=true
- Reduce API chatter from log uploads (to rule out log shipping timeouts):
Copy code
export PREFECT_LOGGING_TO_API_ENABLED=false
# or, send fewer batches
export PREFECT_LOGGING_TO_API_BATCH_INTERVAL=5
- Add retries to the parent/subflows for resilience against transient failures:
Copy code
from prefect import flow

@flow(retries=2, retry_delay_seconds=30)
def parent():
    child_res = child.with_options(retries=2, retry_delay_seconds=15)()
- Ensure heartbeats aren’t too sparse (min allowed is 30s):
Copy code
export PREFECT_RUNNER_HEARTBEAT_FREQUENCY=30
Diagnostics to pinpoint the exact call timing out - Turn on debug logging and (temporarily) keep logs local:
Copy code
export PREFECT_LOGGING_LEVEL=DEBUG
export PREFECT_LOGGING_TO_API_ENABLED=false
- Also capture httpx/httpcore debug logs in your run entrypoint to see which endpoint is stalling:
Copy code
import logging
logging.getLogger("httpx").setLevel("DEBUG")
logging.getLogger("httpcore").setLevel("DEBUG")
- Confirm the active settings:
Copy code
prefect profile inspect
- If you’re self-hosting Prefect Server behind a reverse proxy (NGINX/ALB/etc.), check and raise proxy_read_timeout/idle timeouts beyond 60s. 60s defaults are a very common cause of mid-run ReadTimeouts. A few clarifying questions so I can give more targeted guidance: - Are you on Prefect Cloud or self-hosted Prefect Server? If Server, is there a proxy/ingress in front of it? - Which worker/infrastructure are you using (process, Docker, Kubernetes)? Any OOM/eviction or node restarts around the same time? - Roughly how many subflows are launched concurrently when the timeout occurs? - Does the timeout happen close to the 60s mark? If yes, that strongly suggests a client or proxy timeout. Related docs: - Crash detection and “zombie” runs: Detect zombie flows - Worker/runner healthchecks: Worker healthchecks - Troubleshooting Cloud: Troubleshoot Prefect Cloud If you can share a slightly larger slice of the stack (ideally a few DEBUG lines showing the URL/endpoint right before the timeout) and your environment details, I’ll help zero in on the exact bottleneck and the best setting(s) to adjust.
i
@Marvin can i restart prefect server while the worker is running a job ?
m
thought for 123.8 seconds
Short answer: yes, but expect side effects. What happens if you restart Prefect Server while a worker is running a flow - The code that’s already running in your worker keeps executing; it does not depend on a constant API connection to compute. - During the outage, API interactions will fail: - Heartbeats and log shipping will time out; after enough missed heartbeats, the backend will mark the run as Crashed. See Detect zombie flows. - Any operation that needs the API right then (e.g., creating a subflow run, persisting state/artifacts, result uploads) will raise httpx errors and may fail unless you have retries. - When the server comes back, workers resume communicating; however: - Active runs may have gaps in logs and could appear as Crashed even if they “logically” finished during the outage (because the final state update couldn’t be persisted). - Subflows attempted during the outage will not have been created unless your code retried after the server returned. Safer approaches - Best practice for maintenance: pause scheduling, let active runs drain, then restart. - Pause the work pool(s), wait for active runs to finish, restart, then resume:
Copy code
prefect work-pool pause <pool-name>
  # ... wait for runs to finish ...
  prefect work-pool resume <pool-name>
- You can also temporarily stop workers to ensure no new pulls occur. - If you must restart while jobs are running: - Keep the outage short (ideally < 1–2 heartbeat intervals) to reduce “Crashed” markings. - Consider temporarily increasing client timeouts/retries on workers to ride through the blip:
Copy code
export PREFECT_API_REQUEST_TIMEOUT=180
  export PREFECT_CLIENT_MAX_RETRIES=8
- Optionally bump heartbeat spacing so fewer are missed during a brief restart:
Copy code
export PREFECT_RUNNER_HEARTBEAT_FREQUENCY=60
- Be aware you may lose some logs/artifacts during the downtime. Related docs - Crash detection and heartbeat behavior: Detect zombie flows - Worker healthchecks overview: Worker healthchecks If you share how long your restart typically takes and whether your flows create subflows mid-run, I can suggest the least disruptive sequence (e.g., quick pause/resume vs. draining runs).
i
@Marvin if i use retries in the flow decorator, will it post the webhook after the first failure ?
m
thought for 208.8 seconds
It depends on what your automation is listening for. - With @flow(retries=N), a failed attempt does not put the run in Failed — it transitions to Retrying. So an automation/webhook that triggers on “Flow run Failed” will not fire after the first failed attempt; it only fires if all retries are exhausted and the final state becomes Failed. - If you want a webhook on the first failed attempt, configure an automation to trigger when the flow run enters Retrying. That will fire as soon as the first attempt fails. Note: this will also fire on subsequent retries unless you add filtering (e.g., only fire on the first retry) or a threshold. Common setups - Final failure only: trigger on Failed (and/or Crashed). - Notify on every retry: trigger on Retrying. - Notify on first failure only: trigger on Retrying and add a filter to limit to the first retry attempt (or use a threshold so it only fires once per run). If you’re using Prefect Cloud Automations, you can create an Event Trigger for “flow run entered Retrying/Failed” and tune filters/thresholds in the UI JSON editor. Docs: Event triggers and background on automations: Automations overview. Quick check: Are you using Cloud Automations or a custom webhook in your code? And do you want the webhook on the first failure, every retry, or only after all retries are exhausted? I can outline the exact trigger settings for your case.
i
@Marvin in my parent flow, i am running subflows in for loop. Do you think that might cause httpx read timeout errors ?
m
thought for 88.9 seconds
Short answer: it can. Why - Every subflow creation/run emits multiple API calls (create subflow run, set states, heartbeats, logs). If your parent loop launches many subflows quickly or concurrently, you create a burst of requests. - Under load, the API or any proxy/ingress in front of it can respond slowly enough that the client hits its read timeout (60s by default), yielding httpx.ReadTimeout. - Even sequential subflows can trigger timeouts if each one logs a lot, your server/proxy has a 60s read/idle timeout, or there’s intermittent network latency. Mitigations - Throttle/stagger subflow launches (batch them, sleep a bit between iterations, or cap concurrency with a local semaphore or Prefect’s concurrency limits). - Increase client timeouts and retries:
Copy code
export PREFECT_API_REQUEST_TIMEOUT=180
export PREFECT_CLIENT_MAX_RETRIES=8
export PREFECT_CLIENT_RETRY_EXTRA_CODES=429,502,503,504
- Optional: enable HTTP/2 to improve multiplexing if your network supports it:
Copy code
export PREFECT_API_ENABLE_HTTP2=true
- Reduce API chatter from logs to rule out log upload stalls:
Copy code
export PREFECT_LOGGING_TO_API_ENABLED=false
# or send fewer, larger batches
export PREFECT_LOGGING_TO_API_BATCH_INTERVAL=5
- If you’re self-hosting behind NGINX/ALB/etc., raise proxy_read_timeout/idle timeouts above 60s. If you can share: - Roughly how many subflows you spin up and whether they’re sequential or concurrent - Cloud vs self-hosted Server (and whether there’s a proxy/ingress) - Whether the timeout happens near the 60s mark …I can suggest a concrete throttling pattern and exact settings for your setup. For background on crash/heartbeat behavior: Detect zombie flows.