Kiran
09/08/2025, 12:19 PMMarvin
09/08/2025, 12:19 PMMarvin
09/08/2025, 12:24 PMretries
, Prefect transitions the run to the AwaitingRetry state. AwaitingRetry is a SCHEDULED state for a future attempt, not a terminal state.
- Because the run is SCHEDULED, its result is not available yet — that’s why prefect.exceptions.UnfinishedRun: Run is in SCHEDULED state, its result is not available
is raised if you try to read the result immediately.
A few important notes and fixes:
1) Don’t “return Failed(...)” from your flow body
- In Prefect 3.x, returning a state object from your flow function is not the recommended way to fail it.
- To fail a flow (or task) on a condition, raise an exception. If retries
are set, Prefect will record the attempt as failed/crashed and schedule another attempt.
Example (flow-level retries):
from prefect import flow, get_run_logger
@flow(retries=2, retry_delay_seconds=30)
def my_flow():
logger = get_run_logger()
if some_condition():
# Fail this attempt and trigger a retry
raise RuntimeError("FLOW_FAILURE::OOM (simulated condition)")
return "ok"
Example (task-level retries):
from prefect import task
@task(retries=3, retry_delay_seconds=[10, 30, 60])
def my_task():
if some_condition():
raise ValueError("Failing task to trigger retry")
2) Why you see AwaitingRetry and UnfinishedRun
- The log line “Finished in state AwaitingRetry(..., type=SCHEDULED)” means the current attempt ended and Prefect scheduled a retry for later.
- If you are calling the flow from Python and immediately try to access the result while it’s in SCHEDULED (AwaitingRetry), you’ll get UnfinishedRun
. Wait until the run reaches a terminal state (Completed/Failed/Cancelled/Crashed) before retrieving the result.
Ways to wait for final completion:
- If you trigger a deployment from Python, use wait_for_flow_run
to block until the run reaches a terminal state:
from prefect.deployments import run_deployment
from prefect.flow_runs import wait_for_flow_run
fut = run_deployment("my-project/my-deployment", parameters={...})
final_state = wait_for_flow_run(fut.state().id, poll_interval=5)
result = final_state.result() # only now is it safe
- If you’re working with a PrefectFlowRunFuture
, you can wait before calling .result()
.
3) About the 137 OOM/SIGKILL message
- Exit code 137 indicates the OS killed the process (often due to memory limits). Prefect records these as crashes. With retries configured, Prefect will still schedule another attempt.
- To address real OOMs, increase memory limits/requests on your worker/infrastructure (Docker/Kubernetes/etc.), reduce parallelism/concurrency, or lower memory usage in your code.
Can you share:
- How you’re running the flow (local script vs. deployment/worker)?
- A small snippet showing where you return Failed(...)
and how you’re invoking the flow / reading its result?
With that, I can point you to the exact fix for your setup.