Kiran
09/22/2025, 7:42 AMMarvin
09/22/2025, 7:42 AMMarvin
09/22/2025, 7:46 AMa
, b
, c
, d
, e
, f
, g
with your real tasks.
from prefect import flow, task
# Example tasks (replace these with your real ones)
@task(name="a", log_prints=True)
def a():
print("Running a")
@task(name="b", log_prints=True)
def b():
print("Running b")
@task(name="c", log_prints=True)
def c():
print("Running c")
@task(name="d", log_prints=True)
def d():
print("Running d")
@task(name="e", log_prints=True)
def e():
print("Running e")
@task(name="f", log_prints=True)
def f():
print("Running f")
@task(name="g", log_prints=True)
def g():
print("Running g")
@flow(name="Grouped chained tasks with gated final group")
def orchestrate():
# Your list of task groups
groups = [
[a, b], # group 1 runs a -> b
[c, d, e], # group 2 runs c -> d -> e
[f], # group 3 runs f
[g], # last group: runs only after all previous groups complete
]
# Schedule all groups except the last one
last_futures_of_prior_groups = []
for group in groups[:-1]:
# Kick off the first task in this group; all first tasks run concurrently
first_future = group[0].submit()
prev = first_future
# Chain the rest of the tasks in this group
for t in group[1:]:
prev = t.submit(wait_for=prev)
# Keep the last future in each prior group
last_futures_of_prior_groups.append(prev)
# Schedule the last group to start only after all prior groups have completed successfully
last_group = groups[-1]
prev = last_group[0].submit(wait_for=last_futures_of_prior_groups)
for t in last_group[1:]:
prev = t.submit(wait_for=prev)
# Optionally wait for the entire flow to finish by resolving the last future
return prev.result()
if __name__ == "__main__":
orchestrate()
Notes:
- Using .submit()
for every task ensures each run is registered and visible in the UI immediately.
- The wait_for=
dependency makes downstream tasks start as soon as their upstream completes successfully. If any upstream fails, Prefect will cancel the dependents (so the last group won’t run unless all prior groups complete successfully).
- If you want different failure behavior (e.g., continue even if a prior group fails), we can adjust with allow_failure
or custom logic.
If your tasks take inputs/outputs, we can thread those into .submit()
calls as args/kwargs while keeping the same structure. Want me to adapt this to your exact functions and parameters?Kiran
09/22/2025, 1:40 PMMarvin
09/22/2025, 1:40 PMMarvin
09/22/2025, 1:47 PMfrom prefect import flow, task, get_run_logger
from prefect.futures import as_completed, resolve_futures_to_states
@task
def work(i: int) -> int:
# Example task; raise on some condition to simulate failure
if i == 3:
raise ValueError("Boom")
return i * 2
def run_group(group_name: str, items: list[int]):
logger = get_run_logger()
futures = [work.submit(i) for i in items]
failed = False
# Fail fast: as soon as any task errors, cancel the rest
for fut in as_completed(futures):
try:
fut.result() # will re-raise the task's exception if it failed
except Exception as exc:
failed = True
logger.error(f"{group_name}: task failed: {exc}; cancelling remaining tasks in this group")
for f in futures:
if not f.done():
f.cancel() # will show as Cancelled in the UI
break
# Optional: resolve remaining futures to finalize states in the UI
_ = resolve_futures_to_states(futures)
return futures, not failed
@flow
def pipeline():
logger = get_run_logger()
# Group 1
g1_futures, g1_ok = run_group("group-1", list(range(5)))
# Group 2 (only if group 1 fully succeeded)
if g1_ok:
g2_futures, g2_ok = run_group("group-2", list(range(5, 10)))
else:
g2_futures, g2_ok = [], False
# Final group should be "skipped" (i.e., show as Cancelled) if any previous group failed
if g1_ok and g2_ok:
final_futures, _ = run_group("final-group", list(range(10, 12)))
else:
# Optionally submit final tasks and immediately cancel them so the UI shows them as Cancelled
<http://logger.info|logger.info>("Previous groups not fully successful; marking final group as cancelled")
final_futures = [work.submit(i) for i in [10, 11]]
for f in final_futures:
f.cancel()
_ = resolve_futures_to_states(final_futures)
if __name__ == "__main__":
pipeline()
Notes:
- Cancel is best-effort: tasks already running may still complete or fail before cancellation takes effect.
- If you prefer a single “collector” task instead of submitting the final group, you can guard it with an aggregate check and raise an “abort” from within the task to set it to Cancelled:
from prefect.exceptions import Abort
@task
def final_task(prereqs_ok: bool):
if not prereqs_ok:
raise Abort("Skipping final task because prerequisites failed")
...
- If you don’t submit a task at all, it won’t appear in the graph; to have it appear “skipped-like,” you must submit and then cancel it.
If you share your current code snippet (how you’re creating “groups” and wiring dependencies), I can tailor this pattern to your exact structure.Kiran
09/23/2025, 6:56 AMMarvin
09/23/2025, 6:56 AMMarvin
09/23/2025, 6:58 AMfrom prefect import flow, task, get_run_logger
from prefect.futures import as_completed, resolve_futures_to_states
@task
def work(group: str, idx: int, payload: int) -> int:
# Replace this logic with your real work
# Example failure:
if payload == 3:
raise ValueError(f"{group}[{idx}] failed")
return payload * 2
def run_group_remaining(group_name: str, items: list[int], start_idx: int = 1):
# Run the remaining tasks in this group with fail-fast and cancel-the-rest
logger = get_run_logger()
futures = []
# Submit incrementally so we can stop early on failure (prevents extra work)
for idx in range(start_idx, len(items)):
f = work.submit(group=group_name, idx=idx, payload=items[idx])
futures.append(f)
try:
f.result() # if this fails, cancel remaining not-yet-submitted items
except Exception as exc:
logger.error(f"{group_name}: task {idx} failed: {exc}; cancelling remaining tasks in this group")
# Submit-and-cancel the rest so they show as Cancelled in the UI
for j in range(idx + 1, len(items)):
fj = work.submit(group=group_name, idx=j, payload=items[j])
fj.cancel()
futures.append(fj)
break
# Finalize states in the UI
_ = resolve_futures_to_states(futures)
# Return True if we never hit a failure in this function
return all(getattr(f, "exception", None) is None for f in futures)
@flow
def pipeline():
logger = get_run_logger()
groups = {
"group-1": [0, 1, 2, 3, 4], # 3 will fail in our example
"group-2": [5, 6, 7, 8, 9],
"group-3": [10, 11],
}
# 1) Kick off FIRST TASK of every group concurrently
first_futures = {
name: work.submit(group=name, idx=0, payload=items[0])
for name, items in groups.items()
if items # non-empty
}
# 2) Wait for first tasks; mark groups that fail and cancel their remaining tasks
group_ok = {name: True for name in groups}
all_futures = list(first_futures.values())
for name, f in first_futures.items():
try:
f.result()
except Exception as exc:
group_ok[name] = False
logger.error(f"{name}: first task failed: {exc}; cancelling remaining tasks in this group")
# Submit-and-cancel remaining tasks so they appear as Cancelled in the UI
for idx in range(1, len(groups[name])):
fj = work.submit(group=name, idx=idx, payload=groups[name][idx])
fj.cancel()
all_futures.append(fj)
# 3) For groups whose first task succeeded, run remaining tasks (in parallel across groups)
for name, items in groups.items():
if group_ok[name] and len(items) > 1:
_ok = run_group_remaining(name, items, start_idx=1)
group_ok[name] = group_ok[name] and _ok
# 4) Final group: run if ALL previous groups fully succeeded; otherwise submit-and-cancel
final_items = [100, 101]
all_prev_ok = all(group_ok.values())
if all_prev_ok:
final_futures = [work.submit(group="final", idx=i, payload=v) for i, v in enumerate(final_items)]
else:
<http://logger.info|logger.info>("Previous groups not fully successful; cancelling final group")
final_futures = [work.submit(group="final", idx=i, payload=v) for i, v in enumerate(final_items)]
for f in final_futures:
f.cancel()
all_futures.extend(final_futures)
resolve_futures_to_states(all_futures)
if __name__ == "__main__":
pipeline()
What this does:
- First tasks from all groups run at once.Marvin
09/23/2025, 6:58 AMKiran
09/23/2025, 7:08 AMMarvin
09/23/2025, 7:08 AMMarvin
09/23/2025, 7:09 AMMarvin
09/23/2025, 7:09 AMKiran
09/23/2025, 7:16 AMMarvin
09/23/2025, 7:16 AMMarvin
09/23/2025, 7:17 AMfrom prefect import flow, task, get_run_logger
from prefect.futures import resolve_futures_to_states
@task
def work(group: str, idx: int, payload: int, upstream=None) -> int:
# Replace with your real logic. Example: fail when payload == 3
if payload == 3:
raise ValueError(f"{group}[{idx}] failed")
return payload * 2
def submit_group_chain(group_name: str, items: list[int]):
"""
Submit a sequential chain for a group:
- task[i] depends on task[i-1] via the 'upstream' argument
Returns the list of futures in order.
"""
futures = []
prev = None
for idx, val in enumerate(items):
f = work.submit(group=group_name, idx=idx, payload=val, upstream=prev)
futures.append(f)
prev = f
return futures
def monitor_group_chain(group_name: str, futures: list):
"""
Wait through the chain in order; on first failure, cancel remaining tasks.
Returns True if all tasks completed successfully, else False.
"""
logger = get_run_logger()
ok = True
for i, f in enumerate(futures):
try:
f.result()
except Exception as exc:
ok = False
logger.error(f"{group_name}: task {i} failed: {exc}; cancelling remaining tasks in this group")
for ff in futures[i+1:]:
try:
ff.cancel()
except Exception:
pass
break
resolve_futures_to_states(futures)
return ok
@flow
def pipeline():
logger = get_run_logger()
# Define your groups and choose the dependent (final) group
groups = {
"group-1": [0, 1, 2, 3, 4], # 3 will fail in this example
"group-2": [5, 6, 7, 8, 9],
"group-3": [10, 11],
"final": [100, 101], # depends on all other groups succeeding
}
dependent_group_name = "final"
# Submit chains for all non-dependent groups.
# Because each group's first task has no upstream, all first tasks start concurrently.
non_dependent = {k: v for k, v in groups.items() if k != dependent_group_name}
group_futures = {name: submit_group_chain(name, items) for name, items in non_dependent.items()}
# Monitor each group; if a task fails, cancel the remaining tasks in that group
group_ok = {}
for name, futures in group_futures.items():
group_ok[name] = monitor_group_chain(name, futures)
all_prev_ok = all(group_ok.values())
final_items = groups[dependent_group_name]
# Handle dependent final group
if all_prev_ok:
# Run the final group's chain (sequentially via upstream deps)
final_futures = submit_group_chain(dependent_group_name, final_items)
final_ok = monitor_group_chain(dependent_group_name, final_futures)
<http://logger.info|logger.info>(f"Final group success: {final_ok}")
else:
# Submit-and-cancel the final group's tasks so they appear as Cancelled in the UI
<http://logger.info|logger.info>("Previous groups not fully successful; cancelling dependent final group")
final_futures = submit_group_chain(dependent_group_name, final_items)
for f in final_futures:
f.cancel()
resolve_futures_to_states(final_futures)
if __name__ == "__main__":
pipeline()
Notes:
- Passing the previous task’s future as the upstream
argument enforces the per-group chain (task[i] waits on task[i-1]).Marvin
09/23/2025, 7:17 AMKiran
09/23/2025, 9:03 AMMarvin
09/23/2025, 9:03 AMMarvin
09/23/2025, 9:08 AMwait_for=[upstream]
will not start until the upstream has successfully completed. If the upstream fails or is cancelled (after exhausting retries, if any), the downstream will not run and will typically be marked cancelled.
Notes:
- If the upstream is retrying, the downstream waits until the upstream reaches a terminal state (Completed/Failed/Cancelled).
- If you need the downstream to run even when the upstream fails, wrap the upstream with allow_failure
.
Example:
from prefect import flow, task
from prefect.utilities.annotations import allow_failure
@task
def upstream():
...
@task
def downstream():
...
@task
def downstream_even_if_upstream_failed(_):
...
@flow
def my_flow():
u = upstream.submit()
# Runs only if upstream completes successfully
d = downstream.submit(wait_for=[u])
# Runs even if upstream fails
d_anyway = downstream_even_if_upstream_failed.submit(wait_for=[allow_failure(u)])
Useful links:
- Run work concurrently (covers waiting/dependencies): Run work concurrently
- Task run states (what “Completed/Failed/Cancelled” mean): Task run states
If you’re on a different version or want to handle failures in a specific way (e.g., branching on upstream state), let me know and I can tailor an example.Kiran
09/23/2025, 10:09 AMMarvin
09/23/2025, 10:09 AMMarvin
09/23/2025, 10:10 AMf.done() # True if the future has reached a final state
- Inspect the state explicitly
state = f.get_state()
state.is_final() # True if final (Completed/Failed/Cancelled/Crashed)
- For multiple futures (optionally with a timeout)
from prefect.futures import wait
result = wait([f1, f2], timeout=0.0) # non-blocking check
done = result.done
not_done = result.not_done
Notes:
- f.result()
will block until the future is final; use it only if you want to wait.
- Final states include Completed, Failed, Cancelled, and Crashed; non-final states include Pending, Running, AwaitingRetry, etc.
Refs:
- Concurrency patterns (as_completed, wait): Run work concurrently
- State model details: Task run statesKiran
09/23/2025, 12:33 PMMarvin
09/23/2025, 12:34 PMMarvin
09/23/2025, 12:37 PMfrom prefect.task_runners import ThreadPoolTaskRunner
@flow(task_runner=ThreadPoolTaskRunner(max_workers=8), log_prints=True)
async def client_flow():
...
2) time.sleep blocking
- Your task uses time.sleep. If your runner is not actually using threads/processes, sleep will block and serialize execution.
- Options:
- Keep ps_task sync but ensure a concurrent runner (ThreadPoolTaskRunner/ProcessPoolTaskRunner) is set as above.
- Or make the task async and use asyncio.sleep to avoid blocking:
from prefect import task
@task(log_prints=True)
async def ps_task(name, delay=5):
print(f"Running {name}")
if name == "PS_TPLSweep_General_PSwaps":
raise Exception("custom failure")
if name == "PS_Extend_General_PSwaps":
await asyncio.sleep(10)
await asyncio.sleep(delay)
return f"{name} done"
3) Minor notes (not blockers, but good hygiene)
- You’re doing return_state=True and then awaiting future.wait() for the State — that’s fine. Just be aware the future’s “result” is a State object in that case.
- You’re passing wait_for=[previous_task_state] where previous_task_state is actually the previous future. That’s correct (the name is a bit misleading but it’s the right object to pass).
- If you want the “skipping” action to be tracked as a Prefect task, submit it:
skipping_task.submit(task_name)
Quick sanity check
- With a proper concurrent task runner, your three non-final groups should start together:
- Group 1 first task sleeps ~15s
- Group 2 first task ~5s
- Group 3 first task ~5s
- You should see all three first tasks start near the same time.
Docs
- Concurrency patterns: Run work concurrently
If you can confirm your Prefect version (2.x vs 3.x), I can give you the exact snippet to drop in.Kiran
09/23/2025, 12:39 PMMarvin
09/23/2025, 12:40 PMMarvin
09/23/2025, 12:41 PMfrom prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner
from prefect.states import Cancelled
import asyncio
import time
@task(log_prints=True)
def ps_task(name, delay=5):
print(f"Running {name}")
if name == "PS_TPLSweep_General_PSwaps":
raise Exception("custom failure")
if name == "PS_Extend_General_PSwaps":
time.sleep(10)
time.sleep(delay)
return f"{name} done"
@task(log_prints=True)
def skipping_task(task_name):
# This returns a value; it does not set the task run state to Cancelled.
# It's just a marker/log that we skipped.
return Cancelled(message=f"{task_name} task is skipping", name="SKIPPING")
async def execute_group(group):
"""
Runs a group sequentially. Returns True if the entire group completed,
False if it halted early due to a failure.
"""
previous_future = None
for task_name in group:
# Submit this task; ensure it waits for the prior task in the group
task_future = ps_task.with_options(task_run_name=task_name).submit(
task_name,
wait_for=[previous_future] if previous_future else None,
return_state=True,
)
# Block until this task is in a final state and get its state
state = task_future.result(raise_on_failure=False)
if state.is_completed():
print(f"{task_name} completed")
else:
print(f"{task_name} failed → skipping rest of group")
skipping_task.submit(task_name)
return False
previous_future = task_future
return True
@flow(task_runner=ThreadPoolTaskRunner(max_workers=8), log_prints=True)
async def client_flow():
client_tasks = [
["PS_Extend_General_PSwaps", "PS_TPLSweep_General_PSwaps"],
["PS_Generate", "PS_MTM", "PS_Sweeping"],
["PS_LoanAccountSweeper"],
["PS_Reporting"], # final group
]
# Run all but last group concurrently
non_final_groups = client_tasks[:-1]
results = await asyncio.gather(*(execute_group(g) for g in non_final_groups))
# Run final group only if all others completed
if all(results):
await execute_group(client_tasks[-1])
else:
print("At least one group failed; skipping final group")
if __name__ == "__main__":
# For Prefect flows (even async ones), just call the flow
client_flow()
Tip: If you prefer non-blocking sleeps within tasks, convert ps_task to async and use asyncio.sleep. Concurrency patterns doc: Run work concurrentlyKiran
09/23/2025, 1:04 PMMarvin
09/23/2025, 1:04 PMMarvin
09/23/2025, 1:06 PMfrom prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner
import asyncio
@task(log_prints=True)
async def ps_task(name, delay=5):
print(f"Running {name}")
if name == "PS_TPLSweep_General_PSwaps":
raise Exception("custom failure")
if name == "PS_Extend_General_PSwaps":
# extra delay for this specific task, non-blocking
await asyncio.sleep(10)
await asyncio.sleep(delay)
return f"{name} done"
@task(log_prints=True)
def skipping_task(task_name):
# Marker/log task for a skipped group
return f"{task_name} group skipped"
async def execute_group(group):
"""
Runs tasks in a group sequentially. Returns True if the entire group completed,
False if it halted early due to a failure.
"""
previous_future = None
for task_name in group:
fut = ps_task.with_options(task_run_name=task_name).submit(
task_name,
wait_for=[previous_future] if previous_future else None,
return_state=True,
)
# Non-blocking wait for this task to reach a final state
state = await fut.wait()
if state.is_completed():
print(f"{task_name} completed")
previous_future = fut
else:
print(f"{task_name} failed → skipping rest of group")
skipping_task.submit(task_name)
return False
return True
@flow(task_runner=ThreadPoolTaskRunner(max_workers=8), log_prints=True)
async def client_flow():
client_tasks = [
["PS_Extend_General_PSwaps", "PS_TPLSweep_General_PSwaps"],
["PS_Generate", "PS_MTM", "PS_Sweeping"],
["PS_LoanAccountSweeper"],
["PS_Reporting"], # final group
]
# Run all but last group concurrently
non_final_groups = client_tasks[:-1]
results = await asyncio.gather(*(execute_group(g) for g in non_final_groups))
# Run final group only if all others completed
if all(results):
await execute_group(client_tasks[-1])
else:
print("At least one group failed; skipping final group")
if __name__ == "__main__":
# Call the flow directly (do not use asyncio.run for Prefect flows)
client_flow()
Notes:
- ps_task is async and uses only asyncio.sleep.
- Groups start together via asyncio.gather; tasks within each group are chained with wait_for so they run one-after-another.
- We keep the “skipping” action as a Prefect task for visibility in the UI.
More on concurrency patterns: Run work concurrently