hi <@ULVA73B9P> from prefect import flow, task ...
# ask-community
k
hi @Marvin from prefect import flow, task from prefect.task_runners import ThreadPoolTaskRunner import asyncio @task(log_prints=True) async def ps_task(name, delay=5): print(f"Running {name}") if name == "PS_TPLSweep_General_PSwaps": raise Exception("custom failure") if name == "PS_Extend_General_PSwaps": # extra delay for this specific task, non-blocking await asyncio.sleep(10) pass await asyncio.sleep(delay) return f"{name} done" @task(log_prints=True) def skipping_task(task_name): # Marker/log task for a skipped group return f"{task_name} group skipped" async def execute_group(group): """ Runs tasks in a group sequentially. Returns True if the entire group completed, False if it halted early due to a failure. """ previous_future = None for task_name in group: fut = ps_task.with_options(task_run_name=task_name).submit( task_name, wait_for=[previous_future] if previous_future else None, return_state=True, ) # Non-blocking wait for this task to reach a final state #state = await fut.wait() if fut.is_completed(): print(f"{task_name} completed") previous_future = fut else: print(f"{task_name} failed → skipping rest of group") skipping_task.submit(task_name) return False return True async def main(): client_tasks = [ ["PS_Extend_General_PSwaps", "PS_TPLSweep_General_PSwaps"], ["PS_Generate", "PS_MTM", "PS_Sweeping"], ["PS_LoanAccountSweeper"], ["PS_Reporting"], # final group ] # Run all but last group concurrently non_final_groups = client_tasks[:-1] tasks=[execute_group(g) for g in non_final_groups] results = await asyncio.gather(*tasks) # Run final group only if all others completed if all(results): await execute_group(client_tasks[-1]) else: print("At least one group failed; skipping final group") @flow(task_runner=ThreadPoolTaskRunner(max_workers=8), log_prints=True) async def client_flow(): # async def main(): # client_tasks = [ # ["PS_Extend_General_PSwaps", "PS_TPLSweep_General_PSwaps"], # ["PS_Generate", "PS_MTM", "PS_Sweeping"], # ["PS_LoanAccountSweeper"], # ["PS_Reporting"], # final group # ] # # Run all but last group concurrently # non_final_groups = client_tasks[:-1] # tasks=[execute_group(g) for g in non_final_groups] # results = await asyncio.gather(*tasks) # # Run final group only if all others completed # if all(results): # await execute_group(client_tasks[-1]) # else: # print("At least one group failed; skipping final group") asyncio.run(main()) if name == "__main__": # Call the flow directly (do not use asyncio.run for Prefect flows) client_flow() i am unable to trigger all groups cocurrently, how can i do it in this code
m
thought for 72.7 seconds
You’re very close — two main issues are preventing your groups from actually running concurrently: - You’re checking
fut.is_completed()
immediately after
submit()
, which will almost always be False because the task hasn’t finished yet. You need to await the future (e.g.,
await fut.wait()
). - Don’t call
asyncio.run()
inside a Prefect flow. Prefect already manages the event loop for async flows. Just
await
your async helpers from the flow. Below are two ways to fix it. The second “simpler pattern” is the most Prefect-native for running groups concurrently. Option A — minimal changes to your structure - Use
ConcurrentTaskRunner
for async tasks - Await
.wait()
inside
execute_group
- Call
await main()
inside the flow instead of
asyncio.run(main())
Copy code
from prefect import flow, task
from prefect.task_runners import ConcurrentTaskRunner
import asyncio

@task(log_prints=True)
async def ps_task(name, delay=5):
    print(f"Running {name}")
    if name == "PS_TPLSweep_General_PSwaps":
        raise Exception("custom failure")
    if name == "PS_Extend_General_PSwaps":
        # extra delay for this specific task, non-blocking
        await asyncio.sleep(10)
    await asyncio.sleep(delay)
    return f"{name} done"

@task(log_prints=True)
def skipping_task(task_name):
    # Marker/log task for a skipped group
    return f"{task_name} group skipped"

async def execute_group(group):
    """
    Runs tasks in a group sequentially.
    Returns True if the entire group completed, False if it halted early due to a failure.
    """
    previous_future = None
    for task_name in group:
        fut = ps_task.with_options(task_run_name=task_name).submit(
            task_name,
            wait_for=[previous_future] if previous_future else None,
        )
        # Wait for this task to complete so we can decide whether to continue
        state = await fut.wait()
        if state.is_completed():
            print(f"{task_name} completed")
            previous_future = fut
        else:
            print(f"{task_name} failed → skipping rest of group")
            skipping_task.submit(task_name)
            return False
    return True

async def main():
    client_tasks = [
        ["PS_Extend_General_PSwaps", "PS_TPLSweep_General_PSwaps"],
        ["PS_Generate", "PS_MTM", "PS_Sweeping"],
        ["PS_LoanAccountSweeper"],
        ["PS_Reporting"],  # final group
    ]

    # Run all but last group concurrently
    non_final_groups = client_tasks[:-1]
    results = await asyncio.gather(*(execute_group(g) for g in non_final_groups))

    # Run final group only if all others completed
    if all(results):
        await execute_group(client_tasks[-1])
    else:
        print("At least one group failed; skipping final group")

@flow(task_runner=ConcurrentTaskRunner(max_workers=8), log_prints=True)
async def client_flow():
    await main()

if __name__ == "__main__":
    client_flow()
Option B — simpler Prefect-native pattern (recommended) Let Prefect handle sequencing inside each group with
wait_for
, submit all groups at once, and just wait on the last task of each group concurrently. Then decide whether to run the final group. ``` from prefect import flow, task from prefect.task_runners import ConcurrentTaskRunner from prefect.futures import wait import asyncio @task(log_prints=True) async def ps_task(name, delay=5): print(f"Running {name}") if name == "PS_TPLSweep_General_PSwaps": raise Exception("custom failure") if name == "PS_Extend_General_PSwaps": await asyncio.sleep(10) await asyncio.sleep(delay) return f"{name} done" def submit_group(group): """Submit a sequential chain of tasks and return the last future.""" prev = None for name in group: fut = ps_task.with_options(task_run_name=name).submit( name, wait_for=[prev] if prev else None, ) prev = fut return prev # last task in the group
@flow(task_runner=ConcurrentTaskRunner(max_workers=8), log_prints=True) async def client_flow(): client_tasks = [ ["PS_Extend_General_PSwaps", "PS_TPLSweep_General_PSwaps"], ["PS_Generate", "PS_MTM", "PS_Sweeping"], ["PS_LoanAccountSweeper"], ["PS_Reporting"], # final group ] # Submit non-final groups concurrently; each group is internally sequential group_last_futures = [submit_group(g) for g in client_tasks[:-1]] # Wait for the last task in each non-final group concurrently states = await wait(group_last_futures) # Only run final group if all the non-final groups completed successfully if all(s.is_completed() for s in states): final_last = submit_group(client_tasks[-1]) await final_last.wait() else: print("At least one group failed; skipping final group") if name == "__main__": client_flow() ``` Notes - In Prefect 3.x,
.submit()
returns a Prefect future. To block until it’s finished and get its final state, use
await fut.wait()
. Prefect also provides a helper to wait for many futures concurrently:
await prefect.futures.wait([...])
. Docs: prefect.futures.wait - Since your tasks are async (
async def
+
await asyncio.sleep
),
ConcurrentTaskRunner
is a better fit than
ThreadPoolTaskRunner
. - Don’t use
asyncio.run()
inside a Prefect flow; just
await
your async functions from the flow.
k
@Marvin from prefect import flow, task from prefect.task_runners import ConcurrentTaskRunner import time from prefect.utilities.annotations import allow_failure from prefect.states import Cancelled # ------------------------- # Define Tasks # ------------------------- @task(log_prints=True) def ps_task(name, previous_state=None,delay=20): if previous_state: print(f"previous_state is {previous_state}") # if not previous_state.is_completed(): # return Cancelled(message=f"skippping",name="SKIPPING") state = previous_state.get_state() if not state.is_completed(): # True if final return Cancelled(message=f"skippping",name="SKIPPING") print(f"Running {name}") if name=="PS_MTM": raise Exception("custom failure") # if name=="PS_Extend_General_PSwaps": # time.sleep(20) time.sleep(delay) return f"{name} done" # ------------------------- # Flow Definition # ------------------------- @flow(task_runner=ConcurrentTaskRunner(), log_prints=True) def client_flow(): # # Your list of task groups # groups = [ # ["a", "b"], # group 1 runs a -> b # ["c", "d", "e"], # group 2 runs c -> d -> e # ["f"], # group 3 runs f # "g", # last group: runs only after all previous groups complete # ] client_tasks = [ ["PS_Extend_General_PSwaps", "PS_TPLSweep_General_PSwaps"], ["PS_Generate", "PS_MTM", "PS_Sweeping"], # MTM will fail ["PS_LoanAccountSweeper"], "PS_Reporting", # final task ] # Schedule all groups except the last one last_futures_of_prior_groups = [] for group in client_tasks[ 1] print(f"current group is {group}") # Kick off the first task in this group; all first tasks run concurrently first_future = ps_task.with_options(task_run_name=group[0]).submit(group[0]) prev = first_future # Chain the rest of the tasks in this group for t in group[1] prev = ps_task.with_options(task_run_name=t).submit(t,wait_for=[allow_failure(prev)],previous_state=prev) # Keep the last future in each prior group last_futures_of_prior_groups.append(prev) # Schedule the last group to start only after all prior groups have completed successfully last_group = client_tasks[-1] prev = ps_task.with_options(task_run_name=last_group).submit(last_group,wait_for=last_futures_of_prior_groups) # for t in last_group[1] # prev = ps_task.submit(t,wait_for=prev) # Optionally wait for the entire flow to finish by resolving the last future return prev.result() if name == "__main__": client_flow() in ps_task , i want to chck fi teh previous future is coompleted succesfully, how to do that,in the print statemet of ps_task iam getting previous_state is PS_Extend_General_PSwaps done, can i use thsi to check fi it is a successfuly completed or failed?
m
thought for 185.3 seconds
Great question — right now your
previous_state
parameter is receiving the previous task’s RESULT (a string like
PS_Extend_General_PSwaps done
), not its State. That’s why your print shows the string and why calling
.get_state()
won’t work. To inspect whether the upstream task completed or failed inside
ps_task
, pass the upstream future with
allow_failure(...)
as the argument value. That tells Prefect to deliver the upstream State object to your task argument instead of the resolved result. You’ll also want
allow_failure
in
wait_for
so Prefect doesn’t auto-cancel and you can decide in-task. What to change in your flow submission loop
Copy code
# Kick off the first task in this group
first_future = ps_task.with_options(task_run_name=group[0]).submit(group[0])
prev = first_future

# Chain the rest of the tasks in this group
for t in group[1:]:
    prev = ps_task.with_options(task_run_name=t).submit(
        t,
        previous_state=allow_failure(prev),   # <-- pass State into the task
        wait_for=[allow_failure(prev)],       # <-- don't auto-cancel; let task decide
    )
How to check the upstream state in ps_task
Copy code
from prefect import task
from prefect.states import Cancelled, get_state_result

@task(log_prints=True)
def ps_task(name, previous_state=None, delay=20):
    if previous_state is not None:
        # previous_state is a State when you pass allow_failure(prev)
        print(f"previous_state: {previous_state!r}")  # e.g. <State Completed()>
        if not previous_state.is_completed():
            return Cancelled(message="skipping due to upstream state")

        # If you need the upstream result value:
        upstream_value = get_state_result(previous_state)
        print(f"Upstream result was: {upstream_value}")

    print(f"Running {name}")
    if name == "PS_MTM":
        raise Exception("custom failure")
    time.sleep(delay)
    return f"{name} done"
Alternative: if you don’t need to inspect the upstream state (you just want to skip on failure), you can drop the
previous_state
parameter entirely and rely on
wait_for=[prev]
without
allow_failure
. Prefect will automatically cancel downstream tasks when an upstream dependency fails. Refs: - States and state checks (
is_completed
,
is_failed
, etc.): Prefect 3 States - Getting a result from a State: get_state_result
k
@Marvin hwo is yteh future convertinnginto a state here, becuase as soon as the task is submitted , and print the furure iam getting teh future object (for sirst tasks in teh groups), but when the saem variabel is iam pasiing to the next task sin teh group , it is converting to PS_Extend_General_PSwaps done, how? , in the previous code , not afetr the changes you just suggested
m
thought for 30.9 seconds
Short answer: Prefect resolves argument values for you. Outside the task, you hold a PrefectFuture. When you pass that future as a task argument, Prefect waits for it to finish and injects its resolved value into the downstream task. That’s why inside
ps_task
you saw
previous_state is PS_Extend_General_PSwaps done
— that’s the upstream task’s result string, not a State. What’s happening step-by-step in your “previous code”: - In the flow,
prev
is a PrefectFuture right after
submit()
. If you print it there, you’ll see a future object. - When you later do
ps_task(..., previous_state=prev)
, Prefect records a dependency and, at runtime, waits for
prev
to finish. - If
prev
completes successfully (and you did not wrap with
allow_failure
), Prefect injects the upstream RESULT into the parameter — not the State — so
previous_state
becomes a string like
"PS_Extend_General_PSwaps done"
. - If
prev
fails (and you did not use
allow_failure
in
wait_for
or on the parameter), Prefect cancels the downstream task before it runs, so your task body won’t execute. How to receive the State instead of the result - Wrap the parameter value with
allow_failure(prev)
. That tells Prefect “don’t resolve this to a plain result; give me the final State object.” - If you want your task to still run even when the upstream failed, also use
allow_failure(prev)
in
wait_for
so the failure doesn’t auto-cancel the downstream. Example
Copy code
from prefect.utilities.annotations import allow_failure
from prefect.states import get_state_result

# Downstream submission
prev = ps_task.with_options(task_run_name=t).submit(
    t,
    previous_state=allow_failure(prev),   # pass State object into the task
    wait_for=[allow_failure(prev)],       # don't auto-cancel on upstream failure
)

# Inside the task
@task(log_prints=True)
def ps_task(name, previous_state=None, delay=20):
    if previous_state is not None:
        print(type(previous_state), repr(previous_state))  # -> <class 'prefect.states.State'> <State Completed()>
        if not previous_state.is_completed():
            return Cancelled(message="skipping due to upstream failure")

        upstream_value = get_state_result(previous_state)  # recover the prior task's return value
        print(f"Upstream result was: {upstream_value}")

    print(f"Running {name}")
    ...
    return f"{name} done"
Key points - Passing a future as a normal argument → Prefect injects the resolved result by default. - Passing
allow_failure(future)
as an argument → Prefect injects the final State object instead. - Without
allow_failure
in
wait_for
, a failed upstream cancels the downstream before it runs; with it, the downstream runs and can inspect the upstream state. Docs on states: Prefect 3 States
k
@Marvin give me me the final complete code
m
thought for 34.9 seconds
Here’s a complete working example that: - Runs groups concurrently, each group sequentially - Passes the upstream State into the next task with
allow_failure(...)
- Skips downstream tasks in a group if the upstream did not complete - Runs the final group only if all prior groups completed successfully
Copy code
from prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner
from prefect.utilities.annotations import allow_failure
from prefect.states import Cancelled, get_state_result
from prefect.futures import wait
import time


# -------------------------
# Tasks
# -------------------------
@task(log_prints=True)
def ps_task(name: str, previous_state=None, delay: int = 5):
    # If we passed an upstream future using allow_failure(prev), previous_state is a State
    if previous_state is not None:
        print(f"[{name}] previous_state: {previous_state!r}")

        # Only continue if upstream completed successfully
        if not previous_state.is_completed():
            return Cancelled(message=f"skipping {name} due to upstream not completed")

        # If you need the upstream result value, extract it
        upstream_value = get_state_result(previous_state)
        print(f"[{name}] Upstream result was: {upstream_value}")

    print(f"[{name}] Running")
    if name == "PS_MTM":
        # Example failure
        raise Exception("custom failure")

    time.sleep(delay)
    return f"{name} done"


# -------------------------
# Helper to submit a sequential group
# -------------------------
def submit_group(group: list[str]):
    """
    Submit a sequential chain of tasks and return the last future.
    The first task is submitted normally; subsequent tasks receive the upstream State.
    """
    prev = None
    for i, name in enumerate(group):
        if i == 0:
            prev = ps_task.with_options(task_run_name=name).submit(name)
        else:
            prev = ps_task.with_options(task_run_name=name).submit(
                name,
                previous_state=allow_failure(prev),   # pass State into the task
                wait_for=[allow_failure(prev)],       # don't auto-cancel; let task decide
            )
    return prev  # last task in the group


# -------------------------
# Flow
# -------------------------
@flow(task_runner=ThreadPoolTaskRunner(max_workers=8), log_prints=True)
async def client_flow():
    # Define groups; each inner list runs sequentially
    client_tasks = [
        ["PS_Extend_General_PSwaps", "PS_TPLSweep_General_PSwaps"],
        ["PS_Generate", "PS_MTM", "PS_Sweeping"],  # PS_MTM fails -> downstream in this group will be cancelled by our logic
        ["PS_LoanAccountSweeper"],
        ["PS_Reporting"],  # final group runs only if all prior groups completed
    ]

    # Submit non-final groups concurrently; each group itself is sequential
    non_final_groups = client_tasks[:-1]
    last_futures_of_prior_groups = [submit_group(g) for g in non_final_groups]

    # Wait for the last task in each non-final group concurrently
    prior_states = await wait(last_futures_of_prior_groups)

    # Only run final group if all prior groups completed successfully
    if all(s.is_completed() for s in prior_states):
        final_last_future = submit_group(client_tasks[-1])
        final_state = await final_last_future.wait()
        return final_state.result() if final_state.is_completed() else None
    else:
        print("At least one prior group did not complete; skipping final group")
        return None


if __name__ == "__main__":
    client_flow()
Notes: - Since
ps_task
is synchronous and uses
time.sleep
, this uses
ThreadPoolTaskRunner
for concurrency. - If you convert
ps_task
to async (using
async def
and
await asyncio.sleep
), switch to
ConcurrentTaskRunner
instead.
k
@Marvin from prefect import flow, task from prefect.task_runners import ConcurrentTaskRunner import time from prefect.utilities.annotations import allow_failure from prefect.states import Cancelled # ------------------------- # Define Tasks # ------------------------- @task(log_prints=True) def ps_task(name, previous_state=None,delay=20): if previous_state: print(f"previous_state is {previous_state}") if not previous_state.is_completed(): return Cancelled(message=f"skippping",name="SKIPPING") # state = previous_state.get_state() # if not state.is_completed(): # True if final # return Cancelled(message=f"skippping",name="SKIPPING") print(f"Running {name}") if name=="PS_MTM": raise Exception("custom failure") # if name=="PS_Extend_General_PSwaps": # time.sleep(20) time.sleep(delay) return f"{name} done" # ------------------------- # Flow Definition # ------------------------- @flow(task_runner=ConcurrentTaskRunner(), log_prints=True) def client_flow(): # # Your list of task groups # groups = [ # ["a", "b"], # group 1 runs a -> b # ["c", "d", "e"], # group 2 runs c -> d -> e # ["f"], # group 3 runs f # "g", # last group: runs only after all previous groups complete # ] client_tasks = [ ["PS_Extend_General_PSwaps", "PS_TPLSweep_General_PSwaps"], ["PS_Generate", "PS_MTM", "PS_Sweeping"], # MTM will fail ["PS_LoanAccountSweeper"], "PS_Reporting", # final task ] first_futures=[] # Schedule all groups except the last one last_futures_of_prior_groups = [] for group in client_tasks[ 1] print(f"current group is {group}") # Kick off the first task in this group; all first tasks run concurrently first_future = ps_task.with_options(task_run_name=group[0]).submit(group[0]) prev = first_future first_futures.append(first_future) print(f"futures rae {first_futures}" ) # Chain the rest of the tasks in this group for t in group[1] prev = ps_task.with_options(task_run_name=t).submit(t,wait_for=[allow_failure(prev)], previous_state=allow_failure(prev)) # pass State object into the task) # Keep the last future in each prior group last_futures_of_prior_groups.append(prev) # Schedule the last group to start only after all prior groups have completed successfully last_group = client_tasks[-1] prev = ps_task.with_options(task_run_name=last_group).submit(last_group,wait_for=last_futures_of_prior_groups) # for t in last_group[1] # prev = ps_task.submit(t,wait_for=prev) # Optionally wait for the entire flow to finish by resolving the last future return prev.result() if name == "__main__": client_flow() thsi code is givingth eerror previous_state is 'str' object has no attribute 'is_completed' 040917 PM PS_Sweeping prefect.task_runs Task run failed with exception: AttributeError("'AttributeError' object has no attribute 'is_completed'") - Retries are exhausted Traceback (most recent call last): File "/usr/local/lib/python3.9/site-packages/prefect/task_engine.py", line 805, in run_context yield self File "/usr/local/lib/python3.9/site-packages/prefect/task_engine.py", line 1387, in run_task_sync engine.call_task_fn(txn) File "/usr/local/lib/python3.9/site-packages/prefect/task_engine.py", line 828, in call_task_fn result = call_with_parameters(self.task.fn, parameters) File "/usr/local/lib/python3.9/site-packages/prefect/utilities/callables.py", line 208, in call_with_parameters return fn(*args, **kwargs) File "src/PrimeServicesDependency/client_changes_testing.py", line 500, in ps_task if not previous_state.is_completed(): AttributeError: 'AttributeError' object has no attribute 'is_completed' givihnng thsi error again
m
thought for 98.6 seconds
You’re seeing that error because, in some runs,
previous_state
is not a Prefect State object — it’s already a resolved value (a string) or even an exception instance. Then
previous_state.is_completed()
raises an AttributeError. Two fixes: 1) Make the downstream task robust to either a State or a resolved value 2) Keep using
allow_failure(prev)
on BOTH the parameter and
wait_for
so you normally receive a State and the task still runs even if upstream failed Here’s a complete, defensive version that won’t crash when
previous_state
is not a State: ``` from prefect import flow, task from prefect.task_runners import ConcurrentTaskRunner import time from prefect.utilities.annotations import allow_failure from prefect.states import State, Cancelled, get_state_result # ------------------------- # Define Tasks # ------------------------- @task(log_prints=True) def ps_task(name, previous_state=None, delay=20): # Handle upstream info defensively: it might be a State, a resolved value, or even an exception if previous_state is not None: print(f"[{name}] previous_state: {previous_state!r} (type={type(previous_state)})") if isinstance(previous_state, State): # We received the upstream State (expected when using allow_failure on the param) if not previous_state.is_completed(): return Cancelled(message=f"skipping {name}: upstream not completed") # If you need the upstream result value: try: upstream_value = get_state_result(previous_state) print(f"[{name}] upstream result: {upstream_value!r}") except Exception as exc: # If the upstream failed or its result cannot be read, decide what to do return Cancelled(message=f"skipping {name}: could not read upstream result: {exc!r}") else: # We did not receive a State. Prefect resolved the value before passing it. # At this point, it's already "successful" from Prefect's perspective. # If it's an exception instance, treat it as failure; if it's a normal value, continue. if isinstance(previous_state, BaseException): return Cancelled(message=f"skipping {name}: upstream exception {previous_state!r}") # else: proceed as success print(f"Running {name}") if name == "PS_MTM": raise Exception("custom failure") time.sleep(delay) return f"{name} done" # ------------------------- # Flow Definition # ------------------------- @flow(task_runner=ConcurrentTaskRunner(), log_prints=True) def client_flow(): client_tasks = [ ["PS_Extend_General_PSwaps", "PS_TPLSweep_General_PSwaps"], ["PS_Generate", "PS_MTM", "PS_Sweeping"], # PS_MTM will fail ["PS_LoanAccountSweeper"], "PS_Reporting", # final task ] # Schedule all groups except the last one last_futures_of_prior_groups = [] for group in client_tasks[ 1] print(f"current group is {group}") # First task in the group — starts immediately prev = ps_task.with_options(task_run_name=group[0]).submit(group[0]) # Chain the rest of the tasks in this group for t in group[1] prev = ps_task.with_options(task_run_name=t).submit( t, previous_state=allow_failure(prev), # pass upstream State into the param wait_for=[allow_failure(prev)], # don't auto-cancel; let the task decide ) last_futures_of_prior_groups.append(prev) # Schedule the last task to start only after all prior groups have settled last_group = client_tasks[-1] final_future = ps_task.with_options(task_run_name=last_group).submit( last_group, wait_for=last_futures_of_prior_groups, # here we just wait; we don't need the upstream states ) # Wait for the entire flow to finish by resolving the last future
return final_future.result() if name == "__main__": client_flow() ``` Why this works - When
allow_failure(prev)
is used on the parameter, Prefect will pass the upstream State object to
previous_state
. We check
isinstance(previous_state, State)
and branch accordingly. - If Prefect ever gives you a resolved value (e.g., a string), calling
.is_completed()
would fail — the defensive checks avoid that. - Using
allow_failure(prev)
in
wait_for
ensures the downstream still runs so it can inspect the upstream State and decide to skip. If you want the final group to run only if every prior group fully completed, you can also add a gate before submitting the last task by waiting on the prior futures and checking their states — but the above matches your current pattern. Refs: - States (and helpers like
is_completed
and
get_state_result
): Prefect 3 States