Hey <@ULVA73B9P> prefect.flow_runs Finished in sta...
# ask-community
k
Hey @Marvin prefect.flow_runs Finished in state AwaitingRetry('FLOW_FAILURE:OOM:Process exited with code 137 — SIGKILL detected (likely due to Out Of Memory or external kill).', type=SCHEDULED) , prefect.exceptions.UnfinishedRun: Run is in SCHEDULED state, its result is not available. thsi is from my flow run log, Iam failing my flow with return Failed(('FLOW_FAILURE:OOM:Process exited with code 137 — SIGKILL detected (likely due to Out Of Memory or external kill). upon hitting a conidtion, and i have added retries , why am i seeinga awaiting retry and unfinished run errror
m
thought for 299.4 seconds
Short answer: this is expected when a flow has retries. - When a flow attempt fails or crashes and you’ve configured
retries
, Prefect transitions the run to the AwaitingRetry state. AwaitingRetry is a SCHEDULED state for a future attempt, not a terminal state. - Because the run is SCHEDULED, its result is not available yet — that’s why
prefect.exceptions.UnfinishedRun: Run is in SCHEDULED state, its result is not available
is raised if you try to read the result immediately. A few important notes and fixes: 1) Don’t “return Failed(...)” from your flow body - In Prefect 3.x, returning a state object from your flow function is not the recommended way to fail it. - To fail a flow (or task) on a condition, raise an exception. If
retries
are set, Prefect will record the attempt as failed/crashed and schedule another attempt. Example (flow-level retries):
Copy code
from prefect import flow, get_run_logger

@flow(retries=2, retry_delay_seconds=30)
def my_flow():
    logger = get_run_logger()
    if some_condition():
        # Fail this attempt and trigger a retry
        raise RuntimeError("FLOW_FAILURE::OOM (simulated condition)")
    return "ok"
Example (task-level retries):
Copy code
from prefect import task

@task(retries=3, retry_delay_seconds=[10, 30, 60])
def my_task():
    if some_condition():
        raise ValueError("Failing task to trigger retry")
2) Why you see AwaitingRetry and UnfinishedRun - The log line “Finished in state AwaitingRetry(..., type=SCHEDULED)” means the current attempt ended and Prefect scheduled a retry for later. - If you are calling the flow from Python and immediately try to access the result while it’s in SCHEDULED (AwaitingRetry), you’ll get
UnfinishedRun
. Wait until the run reaches a terminal state (Completed/Failed/Cancelled/Crashed) before retrieving the result. Ways to wait for final completion: - If you trigger a deployment from Python, use
wait_for_flow_run
to block until the run reaches a terminal state:
Copy code
from prefect.deployments import run_deployment
from prefect.flow_runs import wait_for_flow_run

fut = run_deployment("my-project/my-deployment", parameters={...})
final_state = wait_for_flow_run(fut.state().id, poll_interval=5)
result = final_state.result()  # only now is it safe
- If you’re working with a
PrefectFlowRunFuture
, you can wait before calling
.result()
. 3) About the 137 OOM/SIGKILL message - Exit code 137 indicates the OS killed the process (often due to memory limits). Prefect records these as crashes. With retries configured, Prefect will still schedule another attempt. - To address real OOMs, increase memory limits/requests on your worker/infrastructure (Docker/Kubernetes/etc.), reduce parallelism/concurrency, or lower memory usage in your code. Can you share: - How you’re running the flow (local script vs. deployment/worker)? - A small snippet showing where you
return Failed(...)
and how you’re invoking the flow / reading its result? With that, I can point you to the exact fix for your setup.