hi <@ULVA73B9P> i have a lsit of tasks, list1=[[a,...
# ask-community
k
hi @Marvin i have a lsit of tasks, list1=[[a,b],[c,d,e],[f],[g]], first task fo teh each of the sublist should triger at a time, all other tasks in that sublist should start assooon asthe previous task in that sublist is completed successfully ,but the lsat list should excute only after all theprevious lista re completed succesfully, can you give teh preect code , i wnat to see all teh tasks to beseen in the UI
m
thought for 238.5 seconds
Got it! Here’s a Prefect 3.x pattern that does exactly what you described: - The first task in each sublist starts at the same time - Tasks within each sublist run sequentially (each starts as soon as the previous one succeeds) - The last sublist only starts after all previous sublists complete successfully - All tasks are submitted so they’re visible in the UI immediately (downstream tasks will show as Scheduled/Pending until their upstream completes) Replace
a
,
b
,
c
,
d
,
e
,
f
,
g
with your real tasks.
Copy code
from prefect import flow, task

# Example tasks (replace these with your real ones)
@task(name="a", log_prints=True)
def a():
    print("Running a")

@task(name="b", log_prints=True)
def b():
    print("Running b")

@task(name="c", log_prints=True)
def c():
    print("Running c")

@task(name="d", log_prints=True)
def d():
    print("Running d")

@task(name="e", log_prints=True)
def e():
    print("Running e")

@task(name="f", log_prints=True)
def f():
    print("Running f")

@task(name="g", log_prints=True)
def g():
    print("Running g")


@flow(name="Grouped chained tasks with gated final group")
def orchestrate():
    # Your list of task groups
    groups = [
        [a, b],       # group 1 runs a -> b
        [c, d, e],    # group 2 runs c -> d -> e
        [f],          # group 3 runs f
        [g],          # last group: runs only after all previous groups complete
    ]

    # Schedule all groups except the last one
    last_futures_of_prior_groups = []
    for group in groups[:-1]:
        # Kick off the first task in this group; all first tasks run concurrently
        first_future = group[0].submit()
        prev = first_future

        # Chain the rest of the tasks in this group
        for t in group[1:]:
            prev = t.submit(wait_for=prev)

        # Keep the last future in each prior group
        last_futures_of_prior_groups.append(prev)

    # Schedule the last group to start only after all prior groups have completed successfully
    last_group = groups[-1]
    prev = last_group[0].submit(wait_for=last_futures_of_prior_groups)
    for t in last_group[1:]:
        prev = t.submit(wait_for=prev)

    # Optionally wait for the entire flow to finish by resolving the last future
    return prev.result()


if __name__ == "__main__":
    orchestrate()
Notes: - Using
.submit()
for every task ensures each run is registered and visible in the UI immediately. - The
wait_for=
dependency makes downstream tasks start as soon as their upstream completes successfully. If any upstream fails, Prefect will cancel the dependents (so the last group won’t run unless all prior groups complete successfully). - If you want different failure behavior (e.g., continue even if a prior group fails), we can adjust with
allow_failure
or custom logic. If your tasks take inputs/outputs, we can thread those into
.submit()
calls as args/kwargs while keeping the same structure. Want me to adapt this to your exact functions and parameters?
k
hi @Marvin, for succesful case, thsi code is fine , but when a task in a group fails, the remaining tasks should show in the graph in the ui with skipped state, if all of the previous groups are not completed sucesfully , teh fainal group task should be also be showed as skipping
m
thought for 369.3 seconds
Got it — thanks for the context. Quick heads-up: in Prefect 3.x there is no “Skipped” state. The closest representation in the UI is “Cancelled.” So to achieve “show remaining tasks as skipped,” the supported pattern is to cancel the remaining tasks when one in the group fails; they’ll show as Cancelled in the UI. Similarly, if earlier groups aren’t all successful, you can either (a) not submit the final group at all (they won’t appear in the graph), or (b) submit them and immediately cancel them so they appear as Cancelled. Reference: Prefect 3.x states Below is a pattern you can adopt for “fail-fast per group” and “cancel final group if any prior group didn’t fully complete”:
Copy code
from prefect import flow, task, get_run_logger
from prefect.futures import as_completed, resolve_futures_to_states

@task
def work(i: int) -> int:
    # Example task; raise on some condition to simulate failure
    if i == 3:
        raise ValueError("Boom")
    return i * 2

def run_group(group_name: str, items: list[int]):
    logger = get_run_logger()
    futures = [work.submit(i) for i in items]

    failed = False
    # Fail fast: as soon as any task errors, cancel the rest
    for fut in as_completed(futures):
        try:
            fut.result()  # will re-raise the task's exception if it failed
        except Exception as exc:
            failed = True
            logger.error(f"{group_name}: task failed: {exc}; cancelling remaining tasks in this group")
            for f in futures:
                if not f.done():
                    f.cancel()  # will show as Cancelled in the UI
            break

    # Optional: resolve remaining futures to finalize states in the UI
    _ = resolve_futures_to_states(futures)
    return futures, not failed

@flow
def pipeline():
    logger = get_run_logger()

    # Group 1
    g1_futures, g1_ok = run_group("group-1", list(range(5)))

    # Group 2 (only if group 1 fully succeeded)
    if g1_ok:
        g2_futures, g2_ok = run_group("group-2", list(range(5, 10)))
    else:
        g2_futures, g2_ok = [], False

    # Final group should be "skipped" (i.e., show as Cancelled) if any previous group failed
    if g1_ok and g2_ok:
        final_futures, _ = run_group("final-group", list(range(10, 12)))
    else:
        # Optionally submit final tasks and immediately cancel them so the UI shows them as Cancelled
        <http://logger.info|logger.info>("Previous groups not fully successful; marking final group as cancelled")
        final_futures = [work.submit(i) for i in [10, 11]]
        for f in final_futures:
            f.cancel()
        _ = resolve_futures_to_states(final_futures)

if __name__ == "__main__":
    pipeline()
Notes: - Cancel is best-effort: tasks already running may still complete or fail before cancellation takes effect. - If you prefer a single “collector” task instead of submitting the final group, you can guard it with an aggregate check and raise an “abort” from within the task to set it to Cancelled:
Copy code
from prefect.exceptions import Abort

  @task
  def final_task(prereqs_ok: bool):
      if not prereqs_ok:
          raise Abort("Skipping final task because prerequisites failed")
      ...
- If you don’t submit a task at all, it won’t appear in the graph; to have it appear “skipped-like,” you must submit and then cancel it. If you share your current code snippet (how you’re creating “groups” and wiring dependencies), I can tailor this pattern to your exact structure.
k
@Marvin just a change, i need the first tasks in all groups to run at once except for the last task which is dependent om the other groups
m
thought for 102.7 seconds
Great — here’s a simple change to run the first task from each group concurrently, then continue each group independently. If a group’s first task fails, we immediately submit-and-cancel the rest of that group so they show as Cancelled in the UI (Prefect 3.x doesn’t have a Skipped state).
Copy code
from prefect import flow, task, get_run_logger
from prefect.futures import as_completed, resolve_futures_to_states

@task
def work(group: str, idx: int, payload: int) -> int:
    # Replace this logic with your real work
    # Example failure:
    if payload == 3:
        raise ValueError(f"{group}[{idx}] failed")
    return payload * 2

def run_group_remaining(group_name: str, items: list[int], start_idx: int = 1):
    # Run the remaining tasks in this group with fail-fast and cancel-the-rest
    logger = get_run_logger()
    futures = []
    # Submit incrementally so we can stop early on failure (prevents extra work)
    for idx in range(start_idx, len(items)):
        f = work.submit(group=group_name, idx=idx, payload=items[idx])
        futures.append(f)
        try:
            f.result()  # if this fails, cancel remaining not-yet-submitted items
        except Exception as exc:
            logger.error(f"{group_name}: task {idx} failed: {exc}; cancelling remaining tasks in this group")
            # Submit-and-cancel the rest so they show as Cancelled in the UI
            for j in range(idx + 1, len(items)):
                fj = work.submit(group=group_name, idx=j, payload=items[j])
                fj.cancel()
                futures.append(fj)
            break

    # Finalize states in the UI
    _ = resolve_futures_to_states(futures)
    # Return True if we never hit a failure in this function
    return all(getattr(f, "exception", None) is None for f in futures)

@flow
def pipeline():
    logger = get_run_logger()

    groups = {
        "group-1": [0, 1, 2, 3, 4],       # 3 will fail in our example
        "group-2": [5, 6, 7, 8, 9],
        "group-3": [10, 11],
    }

    # 1) Kick off FIRST TASK of every group concurrently
    first_futures = {
        name: work.submit(group=name, idx=0, payload=items[0])
        for name, items in groups.items()
        if items  # non-empty
    }

    # 2) Wait for first tasks; mark groups that fail and cancel their remaining tasks
    group_ok = {name: True for name in groups}
    all_futures = list(first_futures.values())

    for name, f in first_futures.items():
        try:
            f.result()
        except Exception as exc:
            group_ok[name] = False
            logger.error(f"{name}: first task failed: {exc}; cancelling remaining tasks in this group")
            # Submit-and-cancel remaining tasks so they appear as Cancelled in the UI
            for idx in range(1, len(groups[name])):
                fj = work.submit(group=name, idx=idx, payload=groups[name][idx])
                fj.cancel()
                all_futures.append(fj)

    # 3) For groups whose first task succeeded, run remaining tasks (in parallel across groups)
    for name, items in groups.items():
        if group_ok[name] and len(items) > 1:
            _ok = run_group_remaining(name, items, start_idx=1)
            group_ok[name] = group_ok[name] and _ok

    # 4) Final group: run if ALL previous groups fully succeeded; otherwise submit-and-cancel
    final_items = [100, 101]
    all_prev_ok = all(group_ok.values())

    if all_prev_ok:
        final_futures = [work.submit(group="final", idx=i, payload=v) for i, v in enumerate(final_items)]
    else:
        <http://logger.info|logger.info>("Previous groups not fully successful; cancelling final group")
        final_futures = [work.submit(group="final", idx=i, payload=v) for i, v in enumerate(final_items)]
        for f in final_futures:
            f.cancel()

    all_futures.extend(final_futures)
    resolve_futures_to_states(all_futures)

if __name__ == "__main__":
    pipeline()
What this does: - First tasks from all groups run at once.
- If a group’s first task fails, its remaining tasks are submitted and immediately cancelled so they show as Cancelled in the UI. - Groups whose first task succeeded continue with their remaining tasks, failing fast and cancelling the rest if any later task fails. - If any group doesn’t complete successfully, the “final group” tasks are submitted and cancelled to show as Cancelled. Note: Prefect 3.x does not have a Skipped state; Cancelled is the supported visual equivalent in the UI: States in Prefect 3.x. If you share your task signatures and how you organize groups, I can tailor this to your exact code.
k
@Marvin just a change, i need the first tasks in all groups to run at once except for the last task which is dependent om the other groups (edited)
m
thought for 60.2 seconds
Got it. Here’s an updated pattern where: - The first task from every group runs concurrently - The “dependent” last group is excluded from that first wave and only runs if all other groups finish successfully - If any task in a group fails, we cancel the rest of that group so they show as Cancelled in the UI (Prefect 3.x has no Skipped state) ``` from prefect import flow, task, get_run_logger from prefect.futures import as_completed, resolve_futures_to_states @task def work(group: str, idx: int, payload: int) -> int: # Replace with real logic. Example failure: if payload == 3: raise ValueError(f"{group}[{idx}] failed") return payload * 2 def run_group_remaining(group_name: str, items: list[int], start_idx: int = 1) -> bool: """ Submit all remaining tasks in the group concurrently. Fail-fast: on first failure, cancel all not-yet-finished tasks in this group. Returns True if no task failed, else False. """ logger = get_run_logger() if len(items) <= start_idx: return True futures = [ work.submit(group=group_name, idx=idx, payload=items[idx]) for idx in range(start_idx, len(items)) ] failed = False for f in as_completed(futures): try: f.result() except Exception as exc: failed = True logger.error(f"{group_name}: a task failed: {exc}; cancelling remaining tasks in this group") for ff in futures: if not ff.done(): ff.cancel() break resolve_futures_to_states(futures) return not failed @flow def pipeline(): logger = get_run_logger() # Define your groups and pick which one is the "dependent" final group groups = { "group-1": [0, 1, 2, 3, 4], # 3 will fail in our example "group-2": [5, 6, 7, 8, 9], "group-3": [10, 11], "final": [100, 101], # dependent group } dependent_group_name = "final" # 1) Kick off FIRST TASK from every non-dependent group concurrently first_futures = { name: work.submit(group=name, idx=0, payload=items[0]) for name, items in groups.items() if items and name != dependent_group_name } all_futures = list(first_futures.values()) # Track group health (excluding the dependent group) group_ok = {name: True for name in groups.keys() if name != dependent_group_name} # 2) Inspect first tasks; if any failed, submit-and-cancel the rest of that group for name, f in first_futures.items(): try: f.result() except Exception as exc: group_ok[name] = False logger.error(f"{name}: first task failed: {exc}; cancelling remaining tasks in this group") # Submit-and-cancel remaining tasks so they appear as Cancelled in the UI for idx in range(1, len(groups[name])): fj = work.submit(group=name, idx=idx, payload=groups[name][idx]) fj.cancel() all_futures.append(fj) # 3) For groups whose first task succeeded, run remaining tasks concurrently within each group for name, items in groups.items(): if name == dependent_group_name: continue if group_ok[name] and len(items) > 1: rem_ok = run_group_remaining(name, items, start_idx=1) group_ok[name] = group_ok[name] and rem_ok # 4) Dependent final group: only run if ALL non-dependent groups fully succeeded all_prev_ok = all(group_ok.values()) final_items = groups[dependent_group_name] if all_prev_ok: final_futures = [ work.submit(group=dependent_group_name, idx=i, payload=v) for i, v in enumerate(final_items) ] else: logger.info("Previous groups not fully successful; cancelling dependent final group") final_futures = [ work.submit(group=dependent_group_name, idx=i, payload=v)
for i, v in enumerate(final_items) ] for f in final_futures: f.cancel() all_futures.extend(final_futures) resolve_futures_to_states(all_futures) if name == "__main__": pipeline() ``` Notes: - Prefect 3.x does not have a Skipped state; Cancelled is the supported visual equivalent in the UI: States in Prefect 3.x. - If by “last task” you meant “the last task inside each group depends on other groups,” say the word and I’ll adjust the wiring to model that cross-group dependency per-group.
k
@Marvin another correction, all tasks ina group are dependent on the previous task(if any)
m
thought for 91.4 seconds
Perfect — here’s an updated pattern that enforces sequential dependencies within each group (each task waits for the previous one), while still: - Starting the first task of every non-dependent group at the same time - Keeping the final “dependent” group out of that first wave and only running it if all other groups complete successfully - Cancelling the remaining tasks in any group after a failure so they appear as Cancelled in the UI (Prefect 3.x has no Skipped state)
Copy code
from prefect import flow, task, get_run_logger
from prefect.futures import resolve_futures_to_states

@task
def work(group: str, idx: int, payload: int, upstream=None) -> int:
    # Replace with your real logic. Example: fail when payload == 3
    if payload == 3:
        raise ValueError(f"{group}[{idx}] failed")
    return payload * 2

def submit_group_chain(group_name: str, items: list[int]):
    """
    Submit a sequential chain for a group:
    - task[i] depends on task[i-1] via the 'upstream' argument
    Returns the list of futures in order.
    """
    futures = []
    prev = None
    for idx, val in enumerate(items):
        f = work.submit(group=group_name, idx=idx, payload=val, upstream=prev)
        futures.append(f)
        prev = f
    return futures

def monitor_group_chain(group_name: str, futures: list):
    """
    Wait through the chain in order; on first failure, cancel remaining tasks.
    Returns True if all tasks completed successfully, else False.
    """
    logger = get_run_logger()
    ok = True
    for i, f in enumerate(futures):
        try:
            f.result()
        except Exception as exc:
            ok = False
            logger.error(f"{group_name}: task {i} failed: {exc}; cancelling remaining tasks in this group")
            for ff in futures[i+1:]:
                try:
                    ff.cancel()
                except Exception:
                    pass
            break

    resolve_futures_to_states(futures)
    return ok

@flow
def pipeline():
    logger = get_run_logger()

    # Define your groups and choose the dependent (final) group
    groups = {
        "group-1": [0, 1, 2, 3, 4],       # 3 will fail in this example
        "group-2": [5, 6, 7, 8, 9],
        "group-3": [10, 11],
        "final":   [100, 101],            # depends on all other groups succeeding
    }
    dependent_group_name = "final"

    # Submit chains for all non-dependent groups.
    # Because each group's first task has no upstream, all first tasks start concurrently.
    non_dependent = {k: v for k, v in groups.items() if k != dependent_group_name}
    group_futures = {name: submit_group_chain(name, items) for name, items in non_dependent.items()}

    # Monitor each group; if a task fails, cancel the remaining tasks in that group
    group_ok = {}
    for name, futures in group_futures.items():
        group_ok[name] = monitor_group_chain(name, futures)

    all_prev_ok = all(group_ok.values())
    final_items = groups[dependent_group_name]

    # Handle dependent final group
    if all_prev_ok:
        # Run the final group's chain (sequentially via upstream deps)
        final_futures = submit_group_chain(dependent_group_name, final_items)
        final_ok = monitor_group_chain(dependent_group_name, final_futures)
        <http://logger.info|logger.info>(f"Final group success: {final_ok}")
    else:
        # Submit-and-cancel the final group's tasks so they appear as Cancelled in the UI
        <http://logger.info|logger.info>("Previous groups not fully successful; cancelling dependent final group")
        final_futures = submit_group_chain(dependent_group_name, final_items)
        for f in final_futures:
            f.cancel()
        resolve_futures_to_states(final_futures)

if __name__ == "__main__":
    pipeline()
Notes: - Passing the previous task’s future as the
upstream
argument enforces the per-group chain (task[i] waits on task[i-1]).
- If any task fails in a group, the code cancels the rest so they’re visible as Cancelled in the UI. Prefect 3.x doesn’t have a Skipped state: States in Prefect 3.x. - If you want the final group’s tasks to be “guaranteed not to start” before cancellation, we can add a gating mechanism; otherwise, the submit-and-cancel approach typically works well in practice.
k
@Marvin wat_for will exceuet teh next only fi eth previous task is succesful completeley?
m
thought for 306.5 seconds
Great question — assuming you’re on Prefect 3.x: Short answer: Yes. A task/flow that uses
wait_for=[upstream]
will not start until the upstream has successfully completed. If the upstream fails or is cancelled (after exhausting retries, if any), the downstream will not run and will typically be marked cancelled. Notes: - If the upstream is retrying, the downstream waits until the upstream reaches a terminal state (Completed/Failed/Cancelled). - If you need the downstream to run even when the upstream fails, wrap the upstream with
allow_failure
. Example:
Copy code
from prefect import flow, task
from prefect.utilities.annotations import allow_failure

@task
def upstream():
    ...

@task
def downstream():
    ...

@task
def downstream_even_if_upstream_failed(_):
    ...

@flow
def my_flow():
    u = upstream.submit()

    # Runs only if upstream completes successfully
    d = downstream.submit(wait_for=[u])

    # Runs even if upstream fails
    d_anyway = downstream_even_if_upstream_failed.submit(wait_for=[allow_failure(u)])
Useful links: - Run work concurrently (covers waiting/dependencies): Run work concurrently - Task run states (what “Completed/Failed/Cancelled” mean): Task run states If you’re on a different version or want to handle failures in a specific way (e.g., branching on upstream state), let me know and I can tailor an example.
k
@Marvin how to check if a future has reached the final state of execution
m
thought for 107.4 seconds
In Prefect 3.x, you’ve got a few options to check if a PrefectFuture is in a final state (Completed, Failed, Cancelled, or Crashed): - Fast boolean check
Copy code
f.done()  # True if the future has reached a final state
- Inspect the state explicitly
Copy code
state = f.get_state()
state.is_final()  # True if final (Completed/Failed/Cancelled/Crashed)
- For multiple futures (optionally with a timeout)
Copy code
from prefect.futures import wait

result = wait([f1, f2], timeout=0.0)  # non-blocking check
done = result.done
not_done = result.not_done
Notes: -
f.result()
will block until the future is final; use it only if you want to wait. - Final states include Completed, Failed, Cancelled, and Crashed; non-final states include Pending, Running, AwaitingRetry, etc. Refs: - Concurrency patterns (as_completed, wait): Run work concurrently - State model details: Task run states
k
@Marvin from prefect import flow, task from prefect.task_runners import ConcurrentTaskRunner from prefect.states import Cancelled import asyncio import time @task(log_prints=True) def ps_task(name, delay=5): print(f"Running {name}") if name == "PS_TPLSweep_General_PSwaps": raise Exception("custom failure") if name == "PS_Extend_General_PSwaps": time.sleep(10) time.sleep(delay) return f"{name} done" @task def skipping_task(task_name): return Cancelled(message=f"{task_name} task is skipping", name="SKIPPING") async def execute_group(group): previous_task_state = None for task_name in group: task_future = ps_task.with_options(task_run_name=task_name).submit( task_name, wait_for=[previous_task_state] if previous_task_state else None, return_state=True, ) # Wait until task is finished task_state = await task_future.wait() if task_state.is_completed(): print(f"{task_name} completed") else: print(f"{task_name} failed → skipping rest of group") skipping_task(task_name) break previous_task_state = task_future @flow(task_runner=ConcurrentTaskRunner(), log_prints=True) async def client_flow(): client_tasks = [ ["PS_Extend_General_PSwaps", "PS_TPLSweep_General_PSwaps"], ["PS_Generate", "PS_MTM", "PS_Sweeping"], ["PS_LoanAccountSweeper"], ["PS_Reporting"], # final group ] # Run all but last group concurrently non_final_groups = client_tasks[:-1] await asyncio.gather(*(execute_group(g) for g in non_final_groups)) # Run final group only if all others completed await execute_group(client_tasks[-1]) if name == "__main__": asyncio.run(client_flow()) for thsi code why are the three groups not getting trigered at once
m
thought for 221.6 seconds
Likely causes: 1) You’re on Prefect 3.x but using ConcurrentTaskRunner (2.x name) - In Prefect 3.x the runner you want is ThreadPoolTaskRunner (or ProcessPoolTaskRunner). If Python let you import ConcurrentTaskRunner, you’re probably on 2.x. If you’re on 3.x, that import won’t exist and you’d fall back to the default runner (which is not concurrent), making groups run one-by-one. - Fix (Prefect 3.x):
Copy code
from prefect.task_runners import ThreadPoolTaskRunner

@flow(task_runner=ThreadPoolTaskRunner(max_workers=8), log_prints=True)
async def client_flow():
    ...
2) time.sleep blocking - Your task uses time.sleep. If your runner is not actually using threads/processes, sleep will block and serialize execution. - Options: - Keep ps_task sync but ensure a concurrent runner (ThreadPoolTaskRunner/ProcessPoolTaskRunner) is set as above. - Or make the task async and use asyncio.sleep to avoid blocking:
Copy code
from prefect import task

@task(log_prints=True)
async def ps_task(name, delay=5):
    print(f"Running {name}")
    if name == "PS_TPLSweep_General_PSwaps":
        raise Exception("custom failure")
    if name == "PS_Extend_General_PSwaps":
        await asyncio.sleep(10)
    await asyncio.sleep(delay)
    return f"{name} done"
3) Minor notes (not blockers, but good hygiene) - You’re doing return_state=True and then awaiting future.wait() for the State — that’s fine. Just be aware the future’s “result” is a State object in that case. - You’re passing wait_for=[previous_task_state] where previous_task_state is actually the previous future. That’s correct (the name is a bit misleading but it’s the right object to pass). - If you want the “skipping” action to be tracked as a Prefect task, submit it:
Copy code
skipping_task.submit(task_name)
Quick sanity check - With a proper concurrent task runner, your three non-final groups should start together: - Group 1 first task sleeps ~15s - Group 2 first task ~5s - Group 3 first task ~5s - You should see all three first tasks start near the same time. Docs - Concurrency patterns: Run work concurrently If you can confirm your Prefect version (2.x vs 3.x), I can give you the exact snippet to drop in.
k
@Marvin Iam using prefect 3.0 , can you giv eth final cod enow
m
thought for 103.5 seconds
Here’s a Prefect 3.0 version that starts the three non-final groups concurrently and executes tasks within each group sequentially. It uses ThreadPoolTaskRunner and submits the “skipping” task properly. It also only runs the final group if all prior groups fully completed.
Copy code
from prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner
from prefect.states import Cancelled
import asyncio
import time


@task(log_prints=True)
def ps_task(name, delay=5):
    print(f"Running {name}")
    if name == "PS_TPLSweep_General_PSwaps":
        raise Exception("custom failure")
    if name == "PS_Extend_General_PSwaps":
        time.sleep(10)
    time.sleep(delay)
    return f"{name} done"


@task(log_prints=True)
def skipping_task(task_name):
    # This returns a value; it does not set the task run state to Cancelled.
    # It's just a marker/log that we skipped.
    return Cancelled(message=f"{task_name} task is skipping", name="SKIPPING")


async def execute_group(group):
    """
    Runs a group sequentially. Returns True if the entire group completed,
    False if it halted early due to a failure.
    """
    previous_future = None
    for task_name in group:
        # Submit this task; ensure it waits for the prior task in the group
        task_future = ps_task.with_options(task_run_name=task_name).submit(
            task_name,
            wait_for=[previous_future] if previous_future else None,
            return_state=True,
        )

        # Block until this task is in a final state and get its state
        state = task_future.result(raise_on_failure=False)

        if state.is_completed():
            print(f"{task_name} completed")
        else:
            print(f"{task_name} failed → skipping rest of group")
            skipping_task.submit(task_name)
            return False

        previous_future = task_future

    return True


@flow(task_runner=ThreadPoolTaskRunner(max_workers=8), log_prints=True)
async def client_flow():
    client_tasks = [
        ["PS_Extend_General_PSwaps", "PS_TPLSweep_General_PSwaps"],
        ["PS_Generate", "PS_MTM", "PS_Sweeping"],
        ["PS_LoanAccountSweeper"],
        ["PS_Reporting"],  # final group
    ]

    # Run all but last group concurrently
    non_final_groups = client_tasks[:-1]
    results = await asyncio.gather(*(execute_group(g) for g in non_final_groups))

    # Run final group only if all others completed
    if all(results):
        await execute_group(client_tasks[-1])
    else:
        print("At least one group failed; skipping final group")


if __name__ == "__main__":
    # For Prefect flows (even async ones), just call the flow
    client_flow()
Tip: If you prefer non-blocking sleeps within tasks, convert ps_task to async and use asyncio.sleep. Concurrency patterns doc: Run work concurrently
k
@Marvin i want non-blocking sleeps onnly , can you givethe cod eby improving thsi logic
m
thought for 76.6 seconds
Absolutely — here’s your flow updated for non-blocking sleeps and concurrent group starts in Prefect 3.0. Tasks are async (using asyncio.sleep), groups run concurrently, and each group runs sequentially within itself.
Copy code
from prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner
import asyncio


@task(log_prints=True)
async def ps_task(name, delay=5):
    print(f"Running {name}")
    if name == "PS_TPLSweep_General_PSwaps":
        raise Exception("custom failure")
    if name == "PS_Extend_General_PSwaps":
        # extra delay for this specific task, non-blocking
        await asyncio.sleep(10)
    await asyncio.sleep(delay)
    return f"{name} done"


@task(log_prints=True)
def skipping_task(task_name):
    # Marker/log task for a skipped group
    return f"{task_name} group skipped"


async def execute_group(group):
    """
    Runs tasks in a group sequentially. Returns True if the entire group completed,
    False if it halted early due to a failure.
    """
    previous_future = None

    for task_name in group:
        fut = ps_task.with_options(task_run_name=task_name).submit(
            task_name,
            wait_for=[previous_future] if previous_future else None,
            return_state=True,
        )

        # Non-blocking wait for this task to reach a final state
        state = await fut.wait()

        if state.is_completed():
            print(f"{task_name} completed")
            previous_future = fut
        else:
            print(f"{task_name} failed → skipping rest of group")
            skipping_task.submit(task_name)
            return False

    return True


@flow(task_runner=ThreadPoolTaskRunner(max_workers=8), log_prints=True)
async def client_flow():
    client_tasks = [
        ["PS_Extend_General_PSwaps", "PS_TPLSweep_General_PSwaps"],
        ["PS_Generate", "PS_MTM", "PS_Sweeping"],
        ["PS_LoanAccountSweeper"],
        ["PS_Reporting"],  # final group
    ]

    # Run all but last group concurrently
    non_final_groups = client_tasks[:-1]
    results = await asyncio.gather(*(execute_group(g) for g in non_final_groups))

    # Run final group only if all others completed
    if all(results):
        await execute_group(client_tasks[-1])
    else:
        print("At least one group failed; skipping final group")


if __name__ == "__main__":
    # Call the flow directly (do not use asyncio.run for Prefect flows)
    client_flow()
Notes: - ps_task is async and uses only asyncio.sleep. - Groups start together via asyncio.gather; tasks within each group are chained with wait_for so they run one-after-another. - We keep the “skipping” action as a Prefect task for visibility in the UI. More on concurrency patterns: Run work concurrently