Riya Sinha
10/12/2025, 11:07 PMMarvin
10/12/2025, 11:07 PMMarvin
10/12/2025, 11:16 PM@flow(retries=3, retry_delay_seconds=60) or on tasks. See Retries.
Parent and child (subflows)
- Subflows run in the same process and behave like callable units. If a child flow raises, that exception will propagate to the parent unless you catch it.
- It’s common to:
- Fail the parent if any critical child fails.
- Allow the parent to Complete while recording/reporting failed children if the parent’s job is “orchestrate and report” (best-effort pattern).
Patterns you can use
1) Orchestrator that completes if at least some children succeed
from prefect import flow, get_run_logger
@flow
def child_flow(i: int) -> int:
if i % 2:
raise RuntimeError(f"child {i} failed")
return i * 2
@flow(retries=1, retry_delay_seconds=30)
def parent_flow(n: int = 5, require_all_success: bool = False):
log = get_run_logger()
successes, failures = [], []
for i in range(n):
try:
out = child_flow(i) # child exceptions propagate unless caught
successes.append((i, out))
except Exception as exc:
log.warning(f"Child {i} failed: {exc}")
failures.append(i)
if require_all_success and failures:
raise RuntimeError(f"{len(failures)} child flows failed: {failures}")
# Completed if we didn’t raise; still returns a clear summary
return {"successes": successes, "failures": failures}
2) Best-effort tasks with downstream aggregation
- If you want to pass failed task results into a downstream step without upstream-failure blocking, you can use `allow_failure`:
from prefect import flow, task, allow_failure
@task
def maybe_parse(x: str) -> int:
if "bad" in x:
raise ValueError("parse failed")
return len(x)
@task
def aggregate(items):
# items may include allowed failures; decide what “success” means here
ok, errs = [], []
for item in items:
if hasattr(item, "exception") and item.exception():
errs.append(str(item.exception()))
else:
ok.append(item)
return {"ok": ok, "errors": errs}
@flow
def pipeline(payloads: list[str]):
futs = [maybe_parse.submit(p) for p in payloads]
safe = [allow_failure(f) for f in futs]
summary = aggregate(safe)
# Completed if aggregate didn't raise; you can raise if errors > threshold
return summary
Hooks and notifications
- You can attach on_failure, on_completion, etc. on flows for alerts/side effects without changing success criteria. See the flow decorator options (e.g., on_failure) in the API.
Practical guidance
- Define success criteria per flow. Completed means those criteria are satisfied.
- Catch and log non-critical issues; fail only when the objective cannot be met.
- For parent/child:Marvin
10/12/2025, 11:16 PM