hi <@ULVA73B9P> from prefect import flow, task ...
# ask-community
k
hi @Marvin from prefect import flow, task from prefect.task_runners import ThreadPoolTaskRunner import asyncio @task(log_prints=True) async def ps_task(name, delay=5): print(f"Running {name}") if name == "PS_TPLSweep_General_PSwaps": raise Exception("custom failure") if name == "PS_Extend_General_PSwaps": # extra delay for this specific task, non-blocking await asyncio.sleep(10) pass await asyncio.sleep(delay) return f"{name} done" @task(log_prints=True) def skipping_task(task_name): # Marker/log task for a skipped group return f"{task_name} group skipped" async def execute_group(group): """ Runs tasks in a group sequentially. Returns True if the entire group completed, False if it halted early due to a failure. """ previous_future = None for task_name in group: fut = ps_task.with_options(task_run_name=task_name).submit( task_name, wait_for=[previous_future] if previous_future else None, return_state=True, ) # Non-blocking wait for this task to reach a final state #state = await fut.wait() if fut.is_completed(): print(f"{task_name} completed") previous_future = fut else: print(f"{task_name} failed → skipping rest of group") skipping_task.submit(task_name) return False return True async def main(): client_tasks = [ ["PS_Extend_General_PSwaps", "PS_TPLSweep_General_PSwaps"], ["PS_Generate", "PS_MTM", "PS_Sweeping"], ["PS_LoanAccountSweeper"], ["PS_Reporting"], # final group ] # Run all but last group concurrently non_final_groups = client_tasks[:-1] tasks=[execute_group(g) for g in non_final_groups] results = await asyncio.gather(*tasks) # Run final group only if all others completed if all(results): await execute_group(client_tasks[-1]) else: print("At least one group failed; skipping final group") @flow(task_runner=ThreadPoolTaskRunner(max_workers=8), log_prints=True) async def client_flow(): # async def main(): # client_tasks = [ # ["PS_Extend_General_PSwaps", "PS_TPLSweep_General_PSwaps"], # ["PS_Generate", "PS_MTM", "PS_Sweeping"], # ["PS_LoanAccountSweeper"], # ["PS_Reporting"], # final group # ] # # Run all but last group concurrently # non_final_groups = client_tasks[:-1] # tasks=[execute_group(g) for g in non_final_groups] # results = await asyncio.gather(*tasks) # # Run final group only if all others completed # if all(results): # await execute_group(client_tasks[-1]) # else: # print("At least one group failed; skipping final group") asyncio.run(main()) if name == "__main__": # Call the flow directly (do not use asyncio.run for Prefect flows) client_flow() i am unable to trigger all groups cocurrently, how can i do it in this code
m
thought for 72.7 seconds
You’re very close — two main issues are preventing your groups from actually running concurrently: - You’re checking
fut.is_completed()
immediately after
submit()
, which will almost always be False because the task hasn’t finished yet. You need to await the future (e.g.,
await fut.wait()
). - Don’t call
asyncio.run()
inside a Prefect flow. Prefect already manages the event loop for async flows. Just
await
your async helpers from the flow. Below are two ways to fix it. The second “simpler pattern” is the most Prefect-native for running groups concurrently. Option A — minimal changes to your structure - Use
ConcurrentTaskRunner
for async tasks - Await
.wait()
inside
execute_group
- Call
await main()
inside the flow instead of
asyncio.run(main())
Copy code
from prefect import flow, task
from prefect.task_runners import ConcurrentTaskRunner
import asyncio

@task(log_prints=True)
async def ps_task(name, delay=5):
    print(f"Running {name}")
    if name == "PS_TPLSweep_General_PSwaps":
        raise Exception("custom failure")
    if name == "PS_Extend_General_PSwaps":
        # extra delay for this specific task, non-blocking
        await asyncio.sleep(10)
    await asyncio.sleep(delay)
    return f"{name} done"

@task(log_prints=True)
def skipping_task(task_name):
    # Marker/log task for a skipped group
    return f"{task_name} group skipped"

async def execute_group(group):
    """
    Runs tasks in a group sequentially.
    Returns True if the entire group completed, False if it halted early due to a failure.
    """
    previous_future = None
    for task_name in group:
        fut = ps_task.with_options(task_run_name=task_name).submit(
            task_name,
            wait_for=[previous_future] if previous_future else None,
        )
        # Wait for this task to complete so we can decide whether to continue
        state = await fut.wait()
        if state.is_completed():
            print(f"{task_name} completed")
            previous_future = fut
        else:
            print(f"{task_name} failed → skipping rest of group")
            skipping_task.submit(task_name)
            return False
    return True

async def main():
    client_tasks = [
        ["PS_Extend_General_PSwaps", "PS_TPLSweep_General_PSwaps"],
        ["PS_Generate", "PS_MTM", "PS_Sweeping"],
        ["PS_LoanAccountSweeper"],
        ["PS_Reporting"],  # final group
    ]

    # Run all but last group concurrently
    non_final_groups = client_tasks[:-1]
    results = await asyncio.gather(*(execute_group(g) for g in non_final_groups))

    # Run final group only if all others completed
    if all(results):
        await execute_group(client_tasks[-1])
    else:
        print("At least one group failed; skipping final group")

@flow(task_runner=ConcurrentTaskRunner(max_workers=8), log_prints=True)
async def client_flow():
    await main()

if __name__ == "__main__":
    client_flow()
Option B — simpler Prefect-native pattern (recommended) Let Prefect handle sequencing inside each group with
wait_for
, submit all groups at once, and just wait on the last task of each group concurrently. Then decide whether to run the final group. ``` from prefect import flow, task from prefect.task_runners import ConcurrentTaskRunner from prefect.futures import wait import asyncio @task(log_prints=True) async def ps_task(name, delay=5): print(f"Running {name}") if name == "PS_TPLSweep_General_PSwaps": raise Exception("custom failure") if name == "PS_Extend_General_PSwaps": await asyncio.sleep(10) await asyncio.sleep(delay) return f"{name} done" def submit_group(group): """Submit a sequential chain of tasks and return the last future.""" prev = None for name in group: fut = ps_task.with_options(task_run_name=name).submit( name, wait_for=[prev] if prev else None, ) prev = fut return prev # last task in the group
@flow(task_runner=ConcurrentTaskRunner(max_workers=8), log_prints=True) async def client_flow(): client_tasks = [ ["PS_Extend_General_PSwaps", "PS_TPLSweep_General_PSwaps"], ["PS_Generate", "PS_MTM", "PS_Sweeping"], ["PS_LoanAccountSweeper"], ["PS_Reporting"], # final group ] # Submit non-final groups concurrently; each group is internally sequential group_last_futures = [submit_group(g) for g in client_tasks[:-1]] # Wait for the last task in each non-final group concurrently states = await wait(group_last_futures) # Only run final group if all the non-final groups completed successfully if all(s.is_completed() for s in states): final_last = submit_group(client_tasks[-1]) await final_last.wait() else: print("At least one group failed; skipping final group") if name == "__main__": client_flow() ``` Notes - In Prefect 3.x,
.submit()
returns a Prefect future. To block until it’s finished and get its final state, use
await fut.wait()
. Prefect also provides a helper to wait for many futures concurrently:
await prefect.futures.wait([...])
. Docs: prefect.futures.wait - Since your tasks are async (
async def
+
await asyncio.sleep
),
ConcurrentTaskRunner
is a better fit than
ThreadPoolTaskRunner
. - Don’t use
asyncio.run()
inside a Prefect flow; just
await
your async functions from the flow.
k
@Marvin from prefect import flow, task from prefect.task_runners import ConcurrentTaskRunner import time from prefect.utilities.annotations import allow_failure from prefect.states import Cancelled # ------------------------- # Define Tasks # ------------------------- @task(log_prints=True) def ps_task(name, previous_state=None,delay=20): if previous_state: print(f"previous_state is {previous_state}") # if not previous_state.is_completed(): # return Cancelled(message=f"skippping",name="SKIPPING") state = previous_state.get_state() if not state.is_completed(): # True if final return Cancelled(message=f"skippping",name="SKIPPING") print(f"Running {name}") if name=="PS_MTM": raise Exception("custom failure") # if name=="PS_Extend_General_PSwaps": # time.sleep(20) time.sleep(delay) return f"{name} done" # ------------------------- # Flow Definition # ------------------------- @flow(task_runner=ConcurrentTaskRunner(), log_prints=True) def client_flow(): # # Your list of task groups # groups = [ # ["a", "b"], # group 1 runs a -> b # ["c", "d", "e"], # group 2 runs c -> d -> e # ["f"], # group 3 runs f # "g", # last group: runs only after all previous groups complete # ] client_tasks = [ ["PS_Extend_General_PSwaps", "PS_TPLSweep_General_PSwaps"], ["PS_Generate", "PS_MTM", "PS_Sweeping"], # MTM will fail ["PS_LoanAccountSweeper"], "PS_Reporting", # final task ] # Schedule all groups except the last one last_futures_of_prior_groups = [] for group in client_tasks[ 1] print(f"current group is {group}") # Kick off the first task in this group; all first tasks run concurrently first_future = ps_task.with_options(task_run_name=group[0]).submit(group[0]) prev = first_future # Chain the rest of the tasks in this group for t in group[1] prev = ps_task.with_options(task_run_name=t).submit(t,wait_for=[allow_failure(prev)],previous_state=prev) # Keep the last future in each prior group last_futures_of_prior_groups.append(prev) # Schedule the last group to start only after all prior groups have completed successfully last_group = client_tasks[-1] prev = ps_task.with_options(task_run_name=last_group).submit(last_group,wait_for=last_futures_of_prior_groups) # for t in last_group[1] # prev = ps_task.submit(t,wait_for=prev) # Optionally wait for the entire flow to finish by resolving the last future return prev.result() if name == "__main__": client_flow() in ps_task , i want to chck fi teh previous future is coompleted succesfully, how to do that,in the print statemet of ps_task iam getting previous_state is PS_Extend_General_PSwaps done, can i use thsi to check fi it is a successfuly completed or failed?
m
thought for 185.3 seconds
Great question — right now your
previous_state
parameter is receiving the previous task’s RESULT (a string like
PS_Extend_General_PSwaps done
), not its State. That’s why your print shows the string and why calling
.get_state()
won’t work. To inspect whether the upstream task completed or failed inside
ps_task
, pass the upstream future with
allow_failure(...)
as the argument value. That tells Prefect to deliver the upstream State object to your task argument instead of the resolved result. You’ll also want
allow_failure
in
wait_for
so Prefect doesn’t auto-cancel and you can decide in-task. What to change in your flow submission loop
Copy code
# Kick off the first task in this group
first_future = ps_task.with_options(task_run_name=group[0]).submit(group[0])
prev = first_future

# Chain the rest of the tasks in this group
for t in group[1:]:
    prev = ps_task.with_options(task_run_name=t).submit(
        t,
        previous_state=allow_failure(prev),   # <-- pass State into the task
        wait_for=[allow_failure(prev)],       # <-- don't auto-cancel; let task decide
    )
How to check the upstream state in ps_task
Copy code
from prefect import task
from prefect.states import Cancelled, get_state_result

@task(log_prints=True)
def ps_task(name, previous_state=None, delay=20):
    if previous_state is not None:
        # previous_state is a State when you pass allow_failure(prev)
        print(f"previous_state: {previous_state!r}")  # e.g. <State Completed()>
        if not previous_state.is_completed():
            return Cancelled(message="skipping due to upstream state")

        # If you need the upstream result value:
        upstream_value = get_state_result(previous_state)
        print(f"Upstream result was: {upstream_value}")

    print(f"Running {name}")
    if name == "PS_MTM":
        raise Exception("custom failure")
    time.sleep(delay)
    return f"{name} done"
Alternative: if you don’t need to inspect the upstream state (you just want to skip on failure), you can drop the
previous_state
parameter entirely and rely on
wait_for=[prev]
without
allow_failure
. Prefect will automatically cancel downstream tasks when an upstream dependency fails. Refs: - States and state checks (
is_completed
,
is_failed
, etc.): Prefect 3 States - Getting a result from a State: get_state_result
k
@Marvin hwo is yteh future convertinnginto a state here, becuase as soon as the task is submitted , and print the furure iam getting teh future object (for sirst tasks in teh groups), but when the saem variabel is iam pasiing to the next task sin teh group , it is converting to PS_Extend_General_PSwaps done, how? , in the previous code , not afetr the changes you just suggested
m
thought for 30.9 seconds
Short answer: Prefect resolves argument values for you. Outside the task, you hold a PrefectFuture. When you pass that future as a task argument, Prefect waits for it to finish and injects its resolved value into the downstream task. That’s why inside
ps_task
you saw
previous_state is PS_Extend_General_PSwaps done
— that’s the upstream task’s result string, not a State. What’s happening step-by-step in your “previous code”: - In the flow,
prev
is a PrefectFuture right after
submit()
. If you print it there, you’ll see a future object. - When you later do
ps_task(..., previous_state=prev)
, Prefect records a dependency and, at runtime, waits for
prev
to finish. - If
prev
completes successfully (and you did not wrap with
allow_failure
), Prefect injects the upstream RESULT into the parameter — not the State — so
previous_state
becomes a string like
"PS_Extend_General_PSwaps done"
. - If
prev
fails (and you did not use
allow_failure
in
wait_for
or on the parameter), Prefect cancels the downstream task before it runs, so your task body won’t execute. How to receive the State instead of the result - Wrap the parameter value with
allow_failure(prev)
. That tells Prefect “don’t resolve this to a plain result; give me the final State object.” - If you want your task to still run even when the upstream failed, also use
allow_failure(prev)
in
wait_for
so the failure doesn’t auto-cancel the downstream. Example
Copy code
from prefect.utilities.annotations import allow_failure
from prefect.states import get_state_result

# Downstream submission
prev = ps_task.with_options(task_run_name=t).submit(
    t,
    previous_state=allow_failure(prev),   # pass State object into the task
    wait_for=[allow_failure(prev)],       # don't auto-cancel on upstream failure
)

# Inside the task
@task(log_prints=True)
def ps_task(name, previous_state=None, delay=20):
    if previous_state is not None:
        print(type(previous_state), repr(previous_state))  # -> <class 'prefect.states.State'> <State Completed()>
        if not previous_state.is_completed():
            return Cancelled(message="skipping due to upstream failure")

        upstream_value = get_state_result(previous_state)  # recover the prior task's return value
        print(f"Upstream result was: {upstream_value}")

    print(f"Running {name}")
    ...
    return f"{name} done"
Key points - Passing a future as a normal argument → Prefect injects the resolved result by default. - Passing
allow_failure(future)
as an argument → Prefect injects the final State object instead. - Without
allow_failure
in
wait_for
, a failed upstream cancels the downstream before it runs; with it, the downstream runs and can inspect the upstream state. Docs on states: Prefect 3 States
k
@Marvin give me me the final complete code
m
thought for 34.9 seconds
Here’s a complete working example that: - Runs groups concurrently, each group sequentially - Passes the upstream State into the next task with
allow_failure(...)
- Skips downstream tasks in a group if the upstream did not complete - Runs the final group only if all prior groups completed successfully
Copy code
from prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner
from prefect.utilities.annotations import allow_failure
from prefect.states import Cancelled, get_state_result
from prefect.futures import wait
import time


# -------------------------
# Tasks
# -------------------------
@task(log_prints=True)
def ps_task(name: str, previous_state=None, delay: int = 5):
    # If we passed an upstream future using allow_failure(prev), previous_state is a State
    if previous_state is not None:
        print(f"[{name}] previous_state: {previous_state!r}")

        # Only continue if upstream completed successfully
        if not previous_state.is_completed():
            return Cancelled(message=f"skipping {name} due to upstream not completed")

        # If you need the upstream result value, extract it
        upstream_value = get_state_result(previous_state)
        print(f"[{name}] Upstream result was: {upstream_value}")

    print(f"[{name}] Running")
    if name == "PS_MTM":
        # Example failure
        raise Exception("custom failure")

    time.sleep(delay)
    return f"{name} done"


# -------------------------
# Helper to submit a sequential group
# -------------------------
def submit_group(group: list[str]):
    """
    Submit a sequential chain of tasks and return the last future.
    The first task is submitted normally; subsequent tasks receive the upstream State.
    """
    prev = None
    for i, name in enumerate(group):
        if i == 0:
            prev = ps_task.with_options(task_run_name=name).submit(name)
        else:
            prev = ps_task.with_options(task_run_name=name).submit(
                name,
                previous_state=allow_failure(prev),   # pass State into the task
                wait_for=[allow_failure(prev)],       # don't auto-cancel; let task decide
            )
    return prev  # last task in the group


# -------------------------
# Flow
# -------------------------
@flow(task_runner=ThreadPoolTaskRunner(max_workers=8), log_prints=True)
async def client_flow():
    # Define groups; each inner list runs sequentially
    client_tasks = [
        ["PS_Extend_General_PSwaps", "PS_TPLSweep_General_PSwaps"],
        ["PS_Generate", "PS_MTM", "PS_Sweeping"],  # PS_MTM fails -> downstream in this group will be cancelled by our logic
        ["PS_LoanAccountSweeper"],
        ["PS_Reporting"],  # final group runs only if all prior groups completed
    ]

    # Submit non-final groups concurrently; each group itself is sequential
    non_final_groups = client_tasks[:-1]
    last_futures_of_prior_groups = [submit_group(g) for g in non_final_groups]

    # Wait for the last task in each non-final group concurrently
    prior_states = await wait(last_futures_of_prior_groups)

    # Only run final group if all prior groups completed successfully
    if all(s.is_completed() for s in prior_states):
        final_last_future = submit_group(client_tasks[-1])
        final_state = await final_last_future.wait()
        return final_state.result() if final_state.is_completed() else None
    else:
        print("At least one prior group did not complete; skipping final group")
        return None


if __name__ == "__main__":
    client_flow()
Notes: - Since
ps_task
is synchronous and uses
time.sleep
, this uses
ThreadPoolTaskRunner
for concurrency. - If you convert
ps_task
to async (using
async def
and
await asyncio.sleep
), switch to
ConcurrentTaskRunner
instead.
k
@Marvin from prefect import flow, task from prefect.task_runners import ConcurrentTaskRunner import time from prefect.utilities.annotations import allow_failure from prefect.states import Cancelled # ------------------------- # Define Tasks # ------------------------- @task(log_prints=True) def ps_task(name, previous_state=None,delay=20): if previous_state: print(f"previous_state is {previous_state}") if not previous_state.is_completed(): return Cancelled(message=f"skippping",name="SKIPPING") # state = previous_state.get_state() # if not state.is_completed(): # True if final # return Cancelled(message=f"skippping",name="SKIPPING") print(f"Running {name}") if name=="PS_MTM": raise Exception("custom failure") # if name=="PS_Extend_General_PSwaps": # time.sleep(20) time.sleep(delay) return f"{name} done" # ------------------------- # Flow Definition # ------------------------- @flow(task_runner=ConcurrentTaskRunner(), log_prints=True) def client_flow(): # # Your list of task groups # groups = [ # ["a", "b"], # group 1 runs a -> b # ["c", "d", "e"], # group 2 runs c -> d -> e # ["f"], # group 3 runs f # "g", # last group: runs only after all previous groups complete # ] client_tasks = [ ["PS_Extend_General_PSwaps", "PS_TPLSweep_General_PSwaps"], ["PS_Generate", "PS_MTM", "PS_Sweeping"], # MTM will fail ["PS_LoanAccountSweeper"], "PS_Reporting", # final task ] first_futures=[] # Schedule all groups except the last one last_futures_of_prior_groups = [] for group in client_tasks[ 1] print(f"current group is {group}") # Kick off the first task in this group; all first tasks run concurrently first_future = ps_task.with_options(task_run_name=group[0]).submit(group[0]) prev = first_future first_futures.append(first_future) print(f"futures rae {first_futures}" ) # Chain the rest of the tasks in this group for t in group[1] prev = ps_task.with_options(task_run_name=t).submit(t,wait_for=[allow_failure(prev)], previous_state=allow_failure(prev)) # pass State object into the task) # Keep the last future in each prior group last_futures_of_prior_groups.append(prev) # Schedule the last group to start only after all prior groups have completed successfully last_group = client_tasks[-1] prev = ps_task.with_options(task_run_name=last_group).submit(last_group,wait_for=last_futures_of_prior_groups) # for t in last_group[1] # prev = ps_task.submit(t,wait_for=prev) # Optionally wait for the entire flow to finish by resolving the last future return prev.result() if name == "__main__": client_flow() thsi code is givingth eerror previous_state is 'str' object has no attribute 'is_completed' 040917 PM PS_Sweeping prefect.task_runs Task run failed with exception: AttributeError("'AttributeError' object has no attribute 'is_completed'") - Retries are exhausted Traceback (most recent call last): File "/usr/local/lib/python3.9/site-packages/prefect/task_engine.py", line 805, in run_context yield self File "/usr/local/lib/python3.9/site-packages/prefect/task_engine.py", line 1387, in run_task_sync engine.call_task_fn(txn) File "/usr/local/lib/python3.9/site-packages/prefect/task_engine.py", line 828, in call_task_fn result = call_with_parameters(self.task.fn, parameters) File "/usr/local/lib/python3.9/site-packages/prefect/utilities/callables.py", line 208, in call_with_parameters return fn(*args, **kwargs) File "src/PrimeServicesDependency/client_changes_testing.py", line 500, in ps_task if not previous_state.is_completed(): AttributeError: 'AttributeError' object has no attribute 'is_completed' givihnng thsi error again
m
thought for 98.6 seconds
You’re seeing that error because, in some runs,
previous_state
is not a Prefect State object — it’s already a resolved value (a string) or even an exception instance. Then
previous_state.is_completed()
raises an AttributeError. Two fixes: 1) Make the downstream task robust to either a State or a resolved value 2) Keep using
allow_failure(prev)
on BOTH the parameter and
wait_for
so you normally receive a State and the task still runs even if upstream failed Here’s a complete, defensive version that won’t crash when
previous_state
is not a State: ``` from prefect import flow, task from prefect.task_runners import ConcurrentTaskRunner import time from prefect.utilities.annotations import allow_failure from prefect.states import State, Cancelled, get_state_result # ------------------------- # Define Tasks # ------------------------- @task(log_prints=True) def ps_task(name, previous_state=None, delay=20): # Handle upstream info defensively: it might be a State, a resolved value, or even an exception if previous_state is not None: print(f"[{name}] previous_state: {previous_state!r} (type={type(previous_state)})") if isinstance(previous_state, State): # We received the upstream State (expected when using allow_failure on the param) if not previous_state.is_completed(): return Cancelled(message=f"skipping {name}: upstream not completed") # If you need the upstream result value: try: upstream_value = get_state_result(previous_state) print(f"[{name}] upstream result: {upstream_value!r}") except Exception as exc: # If the upstream failed or its result cannot be read, decide what to do return Cancelled(message=f"skipping {name}: could not read upstream result: {exc!r}") else: # We did not receive a State. Prefect resolved the value before passing it. # At this point, it's already "successful" from Prefect's perspective. # If it's an exception instance, treat it as failure; if it's a normal value, continue. if isinstance(previous_state, BaseException): return Cancelled(message=f"skipping {name}: upstream exception {previous_state!r}") # else: proceed as success print(f"Running {name}") if name == "PS_MTM": raise Exception("custom failure") time.sleep(delay) return f"{name} done" # ------------------------- # Flow Definition # ------------------------- @flow(task_runner=ConcurrentTaskRunner(), log_prints=True) def client_flow(): client_tasks = [ ["PS_Extend_General_PSwaps", "PS_TPLSweep_General_PSwaps"], ["PS_Generate", "PS_MTM", "PS_Sweeping"], # PS_MTM will fail ["PS_LoanAccountSweeper"], "PS_Reporting", # final task ] # Schedule all groups except the last one last_futures_of_prior_groups = [] for group in client_tasks[ 1] print(f"current group is {group}") # First task in the group — starts immediately prev = ps_task.with_options(task_run_name=group[0]).submit(group[0]) # Chain the rest of the tasks in this group for t in group[1] prev = ps_task.with_options(task_run_name=t).submit( t, previous_state=allow_failure(prev), # pass upstream State into the param wait_for=[allow_failure(prev)], # don't auto-cancel; let the task decide ) last_futures_of_prior_groups.append(prev) # Schedule the last task to start only after all prior groups have settled last_group = client_tasks[-1] final_future = ps_task.with_options(task_run_name=last_group).submit( last_group, wait_for=last_futures_of_prior_groups, # here we just wait; we don't need the upstream states ) # Wait for the entire flow to finish by resolving the last future
return final_future.result() if name == "__main__": client_flow() ``` Why this works - When
allow_failure(prev)
is used on the parameter, Prefect will pass the upstream State object to
previous_state
. We check
isinstance(previous_state, State)
and branch accordingly. - If Prefect ever gives you a resolved value (e.g., a string), calling
.is_completed()
would fail — the defensive checks avoid that. - Using
allow_failure(prev)
in
wait_for
ensures the downstream still runs so it can inspect the upstream State and decide to skip. If you want the final group to run only if every prior group fully completed, you can also add a gate before submitting the last task by waiting on the prior futures and checking their states — but the above matches your current pattern. Refs: - States (and helpers like
is_completed
and
get_state_result
): Prefect 3 States
k
@Marvin get_state_result takes a future or a state?
m
thought for 84.4 seconds
State. In Prefect 3.x,
prefect.states.get_state_result
expects a
State
, not a
PrefectFuture
. If you have a future, either: - call
future.result()
directly, or - get its state with
future.state
and pass that to
get_state_result
. Example:
Copy code
from prefect.states import get_state_result

future = some_task.submit(1)

# Easiest:
value = future.result()

# Or via state:
state = future.state
value_from_state = get_state_result(state)
If you have nested structures of futures,
prefect.futures.resolve_futures_to_results(expr)
can resolve them in-place. Source: prefect.states.get_state_result source
k
@Marvin from prefect.states import get_state_result future = some_task.submit(1) # Easiest: value = future.result() # Or via state: state = future.state value_from_state = get_state_result(state) here, how can i check if teh state is failed?
m
thought for 150.0 seconds
Use the state’s convenience predicate. - Preferred:
Copy code
from prefect.states import get_state_exception

if state.is_failed():
    exc = get_state_exception(state)  # optional: inspect the exception
    print("Task failed:", exc)
- Or compare the type:
Copy code
from prefect.client.schemas.objects import StateType

if state.type == StateType.FAILED:
    ...
For reference: prefect.states source
k
@Marvin
Copy code
from prefect.states import get_state_exception

if state.is_failed():
    exc = get_state_exception(state)  # optional: inspect the exception
    print("Task failed:", exc)
i need tio check if the stask has reached athe final state namd failed
m
thought for 37.4 seconds
If you need to ensure it’s in the terminal Failed state, you can either: - Wait for finalization (recommended when you have the future), then check:
Copy code
from prefect.states import get_state_exception

future.wait()  # blocks until the run reaches a final state
state = future.state
if state.is_failed():
    exc = get_state_exception(state)
    print("Task failed:", exc)
- Or, if you only have a state object and are polling/checking:
Copy code
from prefect.client.schemas.objects import StateType

if state.is_final() and state.type == StateType.FAILED:
    ...
Note:
state.is_failed()
only returns True for the terminal Failed state. If the run is still Retrying/AwaitingRetry/etc., it won’t be considered failed yet. For reference: prefect.states source
k
@Marvin if state.is_failed(): exc = get_state_exception(state) print("Task failed:", exc) doe stghsi throw an error if the task is runnig state, i mean the if condition
m
thought for 30.9 seconds
No.
state.is_failed()
just returns False for a Running state (and for Pending, Retrying, etc.) and does not raise.
get_state_exception(state)
will raise if the state is not Failed or Crashed — but in your snippet it’s only called when
is_failed()
is True, so you’re safe. Example pattern:
Copy code
from prefect.states import get_state_exception

if state.is_failed():
    exc = get_state_exception(state)
    print("Task failed:", exc)
elif not state.is_final():
    print(f"Task not finished yet; current state: {state.name}")
Reference: prefect.states source
k
@Marvin client_tasks = [ ["PS_Extend_General_PSwaps", "PS_TPLSweep_General_PSwaps"], ["PS_Generate", "PS_MTM", "PS_Sweeping"], # MTM will fail ["PS_LoanAccountSweeper"], ["PS_Reporting"] # final task ] teh list canb be dynamic , all the tasks in the sublist are depenedent on teh previosu one, if any of teh task failes, all other tasks in eth groups shoul d be shown as skipping in the UI, with clear sequential arrow marks for tasksin tge group, all the gropus should run concurrently except the last group, after all the groups are completed , last group should trigger,if all teh previous groups are succesful, then teh last task/list should excecute, if any of teh before groups are failed/ skipped, teh me teh lat task/list shoul be skipped as well, how to do that
m
🔄 Researching your answer... this may take a while 🔧 Using tool:
explore_module_offerings
📊 Tools used: •
display_callable_signature
(3x) •
explore_module_offerings
(4x) •
research_prefect_topic
(4x)
thought for 310.9 seconds
Got it — you want: - A dynamic list of task-groups - Tasks within each group run sequentially (with arrows in the UI) - Groups run concurrently, except the last group - If a task fails in a group, the remaining tasks in that group should not run and appear as skipped/cancelled in the UI - The final group runs only if all prior groups succeeded; otherwise it’s skipped/cancelled In Prefect 3.x, you can do this by: - Chaining tasks within each group by passing the previous task’s future into the next one; this creates explicit edges in the UI and enforces order - Starting the first task of each group at the same time so groups run concurrently - Using a “barrier” task that depends on the last task of every prior group; the first task of your final group depends on the barrier so it only runs if all prior groups completed successfully - Letting Prefect automatically cancel downstream tasks when an upstream fails (they’ll show as grey “Cancelled” in the UI, which is the canonical “skipped due to upstream failure” state) Here’s a complete example you can adapt to your task names. This includes one failing task (
PS_MTM
) to demonstrate skipping/cancellation in that group and the final group. ``` from typing import List, Dict, Callable, Any from prefect import flow, task, get_run_logger from prefect.task_runners import ThreadPoolTaskRunner # ---- Define your tasks (each accepts an optional "upstream" it ignores, used only to create UI edges) ---- @task def PS_Extend_General_PSwaps(_upstream: Any = None): get_run_logger().info("Extend general PSwaps") return "extend-ok" @task def PS_TPLSweep_General_PSwaps(_upstream: Any = None): get_run_logger().info("TPL sweep for general PSwaps") return "tplsweep-ok" @task def PS_Generate(_upstream: Any = None): get_run_logger().info("Generate") return "generate-ok" @task def PS_MTM(_upstream: Any = None): get_run_logger().info("MTM - going to fail for demo") raise RuntimeError("MTM failed") @task def PS_Sweeping(_upstream: Any = None): get_run_logger().info("Sweeping") return "sweeping-ok" @task def PS_LoanAccountSweeper(_upstream: Any = None): get_run_logger().info("Loan account sweep") return "loan-sweeper-ok" @task def PS_Reporting(_upstream: Any = None): get_run_logger().info("Final reporting") return "reporting-ok" # A simple barrier task that only exists to create a dependency on multiple upstreams @task def barrier(*_deps: Any) -> bool: return True # Registry to map your string names to task callables TASKS: Dict[str, Callable[..., Any]] = { "PS_Extend_General_PSwaps": PS_Extend_General_PSwaps, "PS_TPLSweep_General_PSwaps": PS_TPLSweep_General_PSwaps, "PS_Generate": PS_Generate, "PS_MTM": PS_MTM, # This fails in this demo "PS_Sweeping": PS_Sweeping, "PS_LoanAccountSweeper": PS_LoanAccountSweeper, "PS_Reporting": PS_Reporting, } @flow(name="client-orchestrator", task_runner=ThreadPoolTaskRunner()) def orchestrate(client_tasks: List[List[str]]): """ - All groups except the last run concurrently. - Tasks inside each group run strictly in sequence (UI shows arrows). - If a task fails in a group, Prefect cancels the remaining tasks in that group (grey 'Cancelled' in UI). - The last group runs only if all prior groups completed successfully; otherwise it is cancelled. """ # Kick off all groups except the last in parallel last_futures_of_prior_groups = [] for group in client_tasks[ 1] prev = None for idx, task_name in enumerate(group): t = TASKS[task_name] prev = t(prev) if idx > 0 else t() # chain on previous future to enforce order and create arrows last_futures_of_prior_groups.append(prev) # Create a barrier that waits on the last task of every prior group barrier_future = barrier(*last_futures_of_prior_groups)
# Run the final group only after the barrier; if any prior group failed/cancelled, the barrier will be cancelled, # and the entire final group will be cancelled too (i.e., effectively "skipped") final_group = client_tasks[-1] prev = None for idx, task_name in enumerate(final_group): t = TASKS[task_name] if idx == 0: prev = t(barrier_future) # first task in final group waits on the barrier else: prev = t(prev) # continue chaining inside final group # Optionally return the futures/states if you want to inspect them return { "prior_groups_last_tasks": last_futures_of_prior_groups, "final_group_last_task": prev, } if name == "__main__": client_tasks = [ ["PS_Extend_General_PSwaps", "PS_TPLSweep_General_PSwaps"], ["PS_Generate", "PS_MTM", "PS_Sweeping"], # MTM will fail -> Sweeping will be cancelled ["PS_LoanAccountSweeper"], ["PS_Reporting"], # final group; will be cancelled because one prior group failed ] orchestrate(client_tasks) ``` Notes and behaviors: - The UI will show sequential arrows within each group because each task depends on the previous task’s future. - When one task fails in a group, the rest of that group will show as Cancelled (grey) due to upstream failure. This is the standard Prefect behavior for “skipped because upstream failed”. - The “barrier” ensures the final group runs only if all prior groups completed successfully. If any prior group failed/cancelled, the barrier is cancelled and the final group is cancelled. - Groups run concurrently because we start the first task of each group without waiting for others. The
ThreadPoolTaskRunner
makes this explicit. Follow-ups: - Do you want the final group to run even if a prior group had a failure but you consider that acceptable? If so, we can wrap dependencies in
prefect.allow_failure()
selectively. - Do any tasks need to pass real data to the next one? If yes, we’ll thread their outputs instead of the dummy
_upstream
argument above.
k
@Marvin use only one function(with the task decorator) for excuting all teh tasks
m
thought for 111.4 seconds
Great — here’s a pattern that uses a single @task function to execute every step, while preserving: - Sequential arrows within each group - Concurrent groups (except the final group) - Automatic cancellation of remaining steps in a group if one fails - Final group only runs if all prior groups completed successfully How it works: - One generic task
run_step(name, ...)
is reused for all steps - Real work is implemented by plain Python handler functions (no extra @task) - A “barrier” is just another call to the same
run_step
with name "BARRIER" that depends on all prior groups’ last steps Note: In the UI, tasks that are not run because an upstream failed will show as Cancelled (grey). That’s Prefect’s canonical “skipped due to upstream failure.” ``` from typing import Any, Callable, Dict, List, Optional from prefect import flow, task, get_run_logger from prefect.task_runners import ThreadPoolTaskRunner # ---- Plain Python handlers for your actual work (no @task here) ---- def do_PS_Extend_General_PSwaps(upstream: Any = None, **kwargs) -> Any: get_run_logger().info("Extend general PSwaps") return "extend-ok" def do_PS_TPLSweep_General_PSwaps(upstream: Any = None, **kwargs) -> Any: get_run_logger().info("TPL sweep for general PSwaps") return "tplsweep-ok" def do_PS_Generate(upstream: Any = None, **kwargs) -> Any: get_run_logger().info("Generate") return "generate-ok" def do_PS_MTM(upstream: Any = None, **kwargs) -> Any: get_run_logger().info("MTM - failing for demo") raise RuntimeError("MTM failed") def do_PS_Sweeping(upstream: Any = None, **kwargs) -> Any: get_run_logger().info("Sweeping") return "sweeping-ok" def do_PS_LoanAccountSweeper(upstream: Any = None, **kwargs) -> Any: get_run_logger().info("Loan account sweep") return "loan-sweeper-ok" def do_PS_Reporting(upstream: Any = None, **kwargs) -> Any: get_run_logger().info("Final reporting") return "reporting-ok" def do_BARRIER(upstream: Any = None, deps: Optional[List[Any]] = None, **kwargs) -> bool: # This exists just to depend on all prior groups' last steps. # If any dep failed/cancelled, this task will be cancelled by Prefect. get_run_logger().info("Barrier passed") return True # ---- Registry that maps names to handler functions ---- HANDLERS: Dict[str, Callable[..., Any]] = { "PS_Extend_General_PSwaps": do_PS_Extend_General_PSwaps, "PS_TPLSweep_General_PSwaps": do_PS_TPLSweep_General_PSwaps, "PS_Generate": do_PS_Generate, "PS_MTM": do_PS_MTM, # Fails in this demo "PS_Sweeping": do_PS_Sweeping, "PS_LoanAccountSweeper": do_PS_LoanAccountSweeper, "PS_Reporting": do_PS_Reporting, "BARRIER": do_BARRIER, } # ---- One task function reused for all steps ---- @task(task_run_name=lambda name, **_: f"{name}") def run_step( name: str, upstream: Any = None, # used only to create a dependency arrow deps: Optional[List[Any]] = None, # pass a list of upstream futures for fan-in dependencies (barrier) params: Optional[Dict[str, Any]] = None, ) -> Any: handler = HANDLERS.get(name) if not handler: raise ValueError(f"Unknown step name: {name}") # Pass through upstream/deps/params for handlers that want them return handler(upstream=upstream, deps=deps, **(params or {})) @flow(name="client-orchestrator", task_runner=ThreadPoolTaskRunner()) def orchestrate( client_tasks: List[List[str]], params_by_step: Optional[Dict[str, Dict[str, Any]]] = None, ): """ - All groups except the last run concurrently. - Steps inside each group run in strict sequence (UI shows arrows). - If a step fails, remaining steps in that group are cancelled. - Final group runs only if all prior groups complete successfully; otherwise it is cancelled. """ params_by_step = params_by_step or {} # Start all groups except the last in parallel last_futures = [] for group in client_tasks[ 1]
prev = None for i, step in enumerate(group): step_params = params_by_step.get(step) if i == 0: prev = run_step(step, params=step_params) # first step in group else: prev = run_step(step, upstream=prev, params=step_params) # chain to create arrows last_futures.append(prev) # Barrier depends on all prior groups' last steps barrier_future = run_step("BARRIER", deps=last_futures) # Final group runs only if barrier runs (i.e., all prior groups succeeded) final_group = client_tasks[-1] prev = None for i, step in enumerate(final_group): step_params = params_by_step.get(step) if i == 0: prev = run_step(step, upstream=barrier_future, params=step_params) else: prev = run_step(step, upstream=prev, params=step_params) return { "prior_groups_last_tasks": last_futures, "final_group_last_task": prev, } if name == "__main__": client_tasks = [ ["PS_Extend_General_PSwaps", "PS_TPLSweep_General_PSwaps"], ["PS_Generate", "PS_MTM", "PS_Sweeping"], # MTM fails -> Sweeping cancelled ["PS_LoanAccountSweeper"], ["PS_Reporting"], # final group; will be cancelled because one prior group failed ] orchestrate(client_tasks) ``` Does this interpretation match what you meant by “only one task function”? If you instead meant “a single task run for the entire pipeline,” that would remove the per-step arrows and cancellation visuals in the UI. The above approach keeps one reusable task function while still giving you per-step visibility and dependencies.