Kiran
09/24/2025, 6:41 AMMarvin
09/24/2025, 6:42 AMMarvin
09/24/2025, 6:43 AMfut.is_completed()
immediately after submit()
, which will almost always be False because the task hasn’t finished yet. You need to await the future (e.g., await fut.wait()
).
- Don’t call asyncio.run()
inside a Prefect flow. Prefect already manages the event loop for async flows. Just await
your async helpers from the flow.
Below are two ways to fix it. The second “simpler pattern” is the most Prefect-native for running groups concurrently.
Option A — minimal changes to your structure
- Use ConcurrentTaskRunner
for async tasks
- Await .wait()
inside execute_group
- Call await main()
inside the flow instead of asyncio.run(main())
from prefect import flow, task
from prefect.task_runners import ConcurrentTaskRunner
import asyncio
@task(log_prints=True)
async def ps_task(name, delay=5):
print(f"Running {name}")
if name == "PS_TPLSweep_General_PSwaps":
raise Exception("custom failure")
if name == "PS_Extend_General_PSwaps":
# extra delay for this specific task, non-blocking
await asyncio.sleep(10)
await asyncio.sleep(delay)
return f"{name} done"
@task(log_prints=True)
def skipping_task(task_name):
# Marker/log task for a skipped group
return f"{task_name} group skipped"
async def execute_group(group):
"""
Runs tasks in a group sequentially.
Returns True if the entire group completed, False if it halted early due to a failure.
"""
previous_future = None
for task_name in group:
fut = ps_task.with_options(task_run_name=task_name).submit(
task_name,
wait_for=[previous_future] if previous_future else None,
)
# Wait for this task to complete so we can decide whether to continue
state = await fut.wait()
if state.is_completed():
print(f"{task_name} completed")
previous_future = fut
else:
print(f"{task_name} failed → skipping rest of group")
skipping_task.submit(task_name)
return False
return True
async def main():
client_tasks = [
["PS_Extend_General_PSwaps", "PS_TPLSweep_General_PSwaps"],
["PS_Generate", "PS_MTM", "PS_Sweeping"],
["PS_LoanAccountSweeper"],
["PS_Reporting"], # final group
]
# Run all but last group concurrently
non_final_groups = client_tasks[:-1]
results = await asyncio.gather(*(execute_group(g) for g in non_final_groups))
# Run final group only if all others completed
if all(results):
await execute_group(client_tasks[-1])
else:
print("At least one group failed; skipping final group")
@flow(task_runner=ConcurrentTaskRunner(max_workers=8), log_prints=True)
async def client_flow():
await main()
if __name__ == "__main__":
client_flow()
Option B — simpler Prefect-native pattern (recommended)
Let Prefect handle sequencing inside each group with wait_for
, submit all groups at once, and just wait on the last task of each group concurrently. Then decide whether to run the final group.
```
from prefect import flow, task
from prefect.task_runners import ConcurrentTaskRunner
from prefect.futures import wait
import asyncio
@task(log_prints=True)
async def ps_task(name, delay=5):
print(f"Running {name}")
if name == "PS_TPLSweep_General_PSwaps":
raise Exception("custom failure")
if name == "PS_Extend_General_PSwaps":
await asyncio.sleep(10)
await asyncio.sleep(delay)
return f"{name} done"
def submit_group(group):
"""Submit a sequential chain of tasks and return the last future."""
prev = None
for name in group:
fut = ps_task.with_options(task_run_name=name).submit(
name,
wait_for=[prev] if prev else None,
)
prev = fut
return prev # last task in the groupMarvin
09/24/2025, 6:43 AM.submit()
returns a Prefect future. To block until it’s finished and get its final state, use await fut.wait()
. Prefect also provides a helper to wait for many futures concurrently: await prefect.futures.wait([...])
. Docs: prefect.futures.wait
- Since your tasks are async (async def
+ await asyncio.sleep
), ConcurrentTaskRunner
is a better fit than ThreadPoolTaskRunner
.
- Don’t use asyncio.run()
inside a Prefect flow; just await
your async functions from the flow.Kiran
09/24/2025, 10:15 AMMarvin
09/24/2025, 10:15 AMMarvin
09/24/2025, 10:18 AMprevious_state
parameter is receiving the previous task’s RESULT (a string like PS_Extend_General_PSwaps done
), not its State. That’s why your print shows the string and why calling .get_state()
won’t work.
To inspect whether the upstream task completed or failed inside ps_task
, pass the upstream future with allow_failure(...)
as the argument value. That tells Prefect to deliver the upstream State object to your task argument instead of the resolved result. You’ll also want allow_failure
in wait_for
so Prefect doesn’t auto-cancel and you can decide in-task.
What to change in your flow submission loop
# Kick off the first task in this group
first_future = ps_task.with_options(task_run_name=group[0]).submit(group[0])
prev = first_future
# Chain the rest of the tasks in this group
for t in group[1:]:
prev = ps_task.with_options(task_run_name=t).submit(
t,
previous_state=allow_failure(prev), # <-- pass State into the task
wait_for=[allow_failure(prev)], # <-- don't auto-cancel; let task decide
)
How to check the upstream state in ps_task
from prefect import task
from prefect.states import Cancelled, get_state_result
@task(log_prints=True)
def ps_task(name, previous_state=None, delay=20):
if previous_state is not None:
# previous_state is a State when you pass allow_failure(prev)
print(f"previous_state: {previous_state!r}") # e.g. <State Completed()>
if not previous_state.is_completed():
return Cancelled(message="skipping due to upstream state")
# If you need the upstream result value:
upstream_value = get_state_result(previous_state)
print(f"Upstream result was: {upstream_value}")
print(f"Running {name}")
if name == "PS_MTM":
raise Exception("custom failure")
time.sleep(delay)
return f"{name} done"
Alternative: if you don’t need to inspect the upstream state (you just want to skip on failure), you can drop the previous_state
parameter entirely and rely on wait_for=[prev]
without allow_failure
. Prefect will automatically cancel downstream tasks when an upstream dependency fails.
Refs:
- States and state checks (is_completed
, is_failed
, etc.): Prefect 3 States
- Getting a result from a State: get_state_resultKiran
09/24/2025, 10:29 AMMarvin
09/24/2025, 10:29 AMMarvin
09/24/2025, 10:30 AMps_task
you saw previous_state is PS_Extend_General_PSwaps done
— that’s the upstream task’s result string, not a State.
What’s happening step-by-step in your “previous code”:
- In the flow, prev
is a PrefectFuture right after submit()
. If you print it there, you’ll see a future object.
- When you later do ps_task(..., previous_state=prev)
, Prefect records a dependency and, at runtime, waits for prev
to finish.
- If prev
completes successfully (and you did not wrap with allow_failure
), Prefect injects the upstream RESULT into the parameter — not the State — so previous_state
becomes a string like "PS_Extend_General_PSwaps done"
.
- If prev
fails (and you did not use allow_failure
in wait_for
or on the parameter), Prefect cancels the downstream task before it runs, so your task body won’t execute.
How to receive the State instead of the result
- Wrap the parameter value with allow_failure(prev)
. That tells Prefect “don’t resolve this to a plain result; give me the final State object.”
- If you want your task to still run even when the upstream failed, also use allow_failure(prev)
in wait_for
so the failure doesn’t auto-cancel the downstream.
Example
from prefect.utilities.annotations import allow_failure
from prefect.states import get_state_result
# Downstream submission
prev = ps_task.with_options(task_run_name=t).submit(
t,
previous_state=allow_failure(prev), # pass State object into the task
wait_for=[allow_failure(prev)], # don't auto-cancel on upstream failure
)
# Inside the task
@task(log_prints=True)
def ps_task(name, previous_state=None, delay=20):
if previous_state is not None:
print(type(previous_state), repr(previous_state)) # -> <class 'prefect.states.State'> <State Completed()>
if not previous_state.is_completed():
return Cancelled(message="skipping due to upstream failure")
upstream_value = get_state_result(previous_state) # recover the prior task's return value
print(f"Upstream result was: {upstream_value}")
print(f"Running {name}")
...
return f"{name} done"
Key points
- Passing a future as a normal argument → Prefect injects the resolved result by default.
- Passing allow_failure(future)
as an argument → Prefect injects the final State object instead.
- Without allow_failure
in wait_for
, a failed upstream cancels the downstream before it runs; with it, the downstream runs and can inspect the upstream state.
Docs on states: Prefect 3 StatesKiran
09/24/2025, 10:47 AMMarvin
09/24/2025, 10:47 AMMarvin
09/24/2025, 10:47 AMallow_failure(...)
- Skips downstream tasks in a group if the upstream did not complete
- Runs the final group only if all prior groups completed successfully
from prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner
from prefect.utilities.annotations import allow_failure
from prefect.states import Cancelled, get_state_result
from prefect.futures import wait
import time
# -------------------------
# Tasks
# -------------------------
@task(log_prints=True)
def ps_task(name: str, previous_state=None, delay: int = 5):
# If we passed an upstream future using allow_failure(prev), previous_state is a State
if previous_state is not None:
print(f"[{name}] previous_state: {previous_state!r}")
# Only continue if upstream completed successfully
if not previous_state.is_completed():
return Cancelled(message=f"skipping {name} due to upstream not completed")
# If you need the upstream result value, extract it
upstream_value = get_state_result(previous_state)
print(f"[{name}] Upstream result was: {upstream_value}")
print(f"[{name}] Running")
if name == "PS_MTM":
# Example failure
raise Exception("custom failure")
time.sleep(delay)
return f"{name} done"
# -------------------------
# Helper to submit a sequential group
# -------------------------
def submit_group(group: list[str]):
"""
Submit a sequential chain of tasks and return the last future.
The first task is submitted normally; subsequent tasks receive the upstream State.
"""
prev = None
for i, name in enumerate(group):
if i == 0:
prev = ps_task.with_options(task_run_name=name).submit(name)
else:
prev = ps_task.with_options(task_run_name=name).submit(
name,
previous_state=allow_failure(prev), # pass State into the task
wait_for=[allow_failure(prev)], # don't auto-cancel; let task decide
)
return prev # last task in the group
# -------------------------
# Flow
# -------------------------
@flow(task_runner=ThreadPoolTaskRunner(max_workers=8), log_prints=True)
async def client_flow():
# Define groups; each inner list runs sequentially
client_tasks = [
["PS_Extend_General_PSwaps", "PS_TPLSweep_General_PSwaps"],
["PS_Generate", "PS_MTM", "PS_Sweeping"], # PS_MTM fails -> downstream in this group will be cancelled by our logic
["PS_LoanAccountSweeper"],
["PS_Reporting"], # final group runs only if all prior groups completed
]
# Submit non-final groups concurrently; each group itself is sequential
non_final_groups = client_tasks[:-1]
last_futures_of_prior_groups = [submit_group(g) for g in non_final_groups]
# Wait for the last task in each non-final group concurrently
prior_states = await wait(last_futures_of_prior_groups)
# Only run final group if all prior groups completed successfully
if all(s.is_completed() for s in prior_states):
final_last_future = submit_group(client_tasks[-1])
final_state = await final_last_future.wait()
return final_state.result() if final_state.is_completed() else None
else:
print("At least one prior group did not complete; skipping final group")
return None
if __name__ == "__main__":
client_flow()
Notes:
- Since ps_task
is synchronous and uses time.sleep
, this uses ThreadPoolTaskRunner
for concurrency.
- If you convert ps_task
to async (using async def
and await asyncio.sleep
), switch to ConcurrentTaskRunner
instead.Kiran
09/24/2025, 10:55 AMMarvin
09/24/2025, 10:55 AMMarvin
09/24/2025, 10:57 AMprevious_state
is not a Prefect State object — it’s already a resolved value (a string) or even an exception instance. Then previous_state.is_completed()
raises an AttributeError.
Two fixes:
1) Make the downstream task robust to either a State or a resolved value
2) Keep using allow_failure(prev)
on BOTH the parameter and wait_for
so you normally receive a State and the task still runs even if upstream failed
Here’s a complete, defensive version that won’t crash when previous_state
is not a State:
```
from prefect import flow, task
from prefect.task_runners import ConcurrentTaskRunner
import time
from prefect.utilities.annotations import allow_failure
from prefect.states import State, Cancelled, get_state_result
# -------------------------
# Define Tasks
# -------------------------
@task(log_prints=True)
def ps_task(name, previous_state=None, delay=20):
# Handle upstream info defensively: it might be a State, a resolved value, or even an exception
if previous_state is not None:
print(f"[{name}] previous_state: {previous_state!r} (type={type(previous_state)})")
if isinstance(previous_state, State):
# We received the upstream State (expected when using allow_failure on the param)
if not previous_state.is_completed():
return Cancelled(message=f"skipping {name}: upstream not completed")
# If you need the upstream result value:
try:
upstream_value = get_state_result(previous_state)
print(f"[{name}] upstream result: {upstream_value!r}")
except Exception as exc:
# If the upstream failed or its result cannot be read, decide what to do
return Cancelled(message=f"skipping {name}: could not read upstream result: {exc!r}")
else:
# We did not receive a State. Prefect resolved the value before passing it.
# At this point, it's already "successful" from Prefect's perspective.
# If it's an exception instance, treat it as failure; if it's a normal value, continue.
if isinstance(previous_state, BaseException):
return Cancelled(message=f"skipping {name}: upstream exception {previous_state!r}")
# else: proceed as success
print(f"Running {name}")
if name == "PS_MTM":
raise Exception("custom failure")
time.sleep(delay)
return f"{name} done"
# -------------------------
# Flow Definition
# -------------------------
@flow(task_runner=ConcurrentTaskRunner(), log_prints=True)
def client_flow():
client_tasks = [
["PS_Extend_General_PSwaps", "PS_TPLSweep_General_PSwaps"],
["PS_Generate", "PS_MTM", "PS_Sweeping"], # PS_MTM will fail
["PS_LoanAccountSweeper"],
"PS_Reporting", # final task
]
# Schedule all groups except the last one
last_futures_of_prior_groups = []
for group in client_tasks[ 1]
print(f"current group is {group}")
# First task in the group — starts immediately
prev = ps_task.with_options(task_run_name=group[0]).submit(group[0])
# Chain the rest of the tasks in this group
for t in group[1]
prev = ps_task.with_options(task_run_name=t).submit(
t,
previous_state=allow_failure(prev), # pass upstream State into the param
wait_for=[allow_failure(prev)], # don't auto-cancel; let the task decide
)
last_futures_of_prior_groups.append(prev)
# Schedule the last task to start only after all prior groups have settled
last_group = client_tasks[-1]
final_future = ps_task.with_options(task_run_name=last_group).submit(
last_group,
wait_for=last_futures_of_prior_groups, # here we just wait; we don't need the upstream states
)
# Wait for the entire flow to finish by resolving the last futureMarvin
09/24/2025, 10:57 AMallow_failure(prev)
is used on the parameter, Prefect will pass the upstream State object to previous_state
. We check isinstance(previous_state, State)
and branch accordingly.
- If Prefect ever gives you a resolved value (e.g., a string), calling .is_completed()
would fail — the defensive checks avoid that.
- Using allow_failure(prev)
in wait_for
ensures the downstream still runs so it can inspect the upstream State and decide to skip.
If you want the final group to run only if every prior group fully completed, you can also add a gate before submitting the last task by waiting on the prior futures and checking their states — but the above matches your current pattern.
Refs:
- States (and helpers like is_completed
and get_state_result
): Prefect 3 States