Kiran
09/24/2025, 6:41 AMMarvin
09/24/2025, 6:42 AMMarvin
09/24/2025, 6:43 AMfut.is_completed() immediately after submit(), which will almost always be False because the task hasn’t finished yet. You need to await the future (e.g., await fut.wait()).
- Don’t call asyncio.run() inside a Prefect flow. Prefect already manages the event loop for async flows. Just await your async helpers from the flow.
Below are two ways to fix it. The second “simpler pattern” is the most Prefect-native for running groups concurrently.
Option A — minimal changes to your structure
- Use ConcurrentTaskRunner for async tasks
- Await .wait() inside execute_group
- Call await main() inside the flow instead of asyncio.run(main())
from prefect import flow, task
from prefect.task_runners import ConcurrentTaskRunner
import asyncio
@task(log_prints=True)
async def ps_task(name, delay=5):
print(f"Running {name}")
if name == "PS_TPLSweep_General_PSwaps":
raise Exception("custom failure")
if name == "PS_Extend_General_PSwaps":
# extra delay for this specific task, non-blocking
await asyncio.sleep(10)
await asyncio.sleep(delay)
return f"{name} done"
@task(log_prints=True)
def skipping_task(task_name):
# Marker/log task for a skipped group
return f"{task_name} group skipped"
async def execute_group(group):
"""
Runs tasks in a group sequentially.
Returns True if the entire group completed, False if it halted early due to a failure.
"""
previous_future = None
for task_name in group:
fut = ps_task.with_options(task_run_name=task_name).submit(
task_name,
wait_for=[previous_future] if previous_future else None,
)
# Wait for this task to complete so we can decide whether to continue
state = await fut.wait()
if state.is_completed():
print(f"{task_name} completed")
previous_future = fut
else:
print(f"{task_name} failed → skipping rest of group")
skipping_task.submit(task_name)
return False
return True
async def main():
client_tasks = [
["PS_Extend_General_PSwaps", "PS_TPLSweep_General_PSwaps"],
["PS_Generate", "PS_MTM", "PS_Sweeping"],
["PS_LoanAccountSweeper"],
["PS_Reporting"], # final group
]
# Run all but last group concurrently
non_final_groups = client_tasks[:-1]
results = await asyncio.gather(*(execute_group(g) for g in non_final_groups))
# Run final group only if all others completed
if all(results):
await execute_group(client_tasks[-1])
else:
print("At least one group failed; skipping final group")
@flow(task_runner=ConcurrentTaskRunner(max_workers=8), log_prints=True)
async def client_flow():
await main()
if __name__ == "__main__":
client_flow()
Option B — simpler Prefect-native pattern (recommended)
Let Prefect handle sequencing inside each group with wait_for, submit all groups at once, and just wait on the last task of each group concurrently. Then decide whether to run the final group.
```
from prefect import flow, task
from prefect.task_runners import ConcurrentTaskRunner
from prefect.futures import wait
import asyncio
@task(log_prints=True)
async def ps_task(name, delay=5):
print(f"Running {name}")
if name == "PS_TPLSweep_General_PSwaps":
raise Exception("custom failure")
if name == "PS_Extend_General_PSwaps":
await asyncio.sleep(10)
await asyncio.sleep(delay)
return f"{name} done"
def submit_group(group):
"""Submit a sequential chain of tasks and return the last future."""
prev = None
for name in group:
fut = ps_task.with_options(task_run_name=name).submit(
name,
wait_for=[prev] if prev else None,
)
prev = fut
return prev # last task in the groupMarvin
09/24/2025, 6:43 AM.submit() returns a Prefect future. To block until it’s finished and get its final state, use await fut.wait(). Prefect also provides a helper to wait for many futures concurrently: await prefect.futures.wait([...]). Docs: prefect.futures.wait
- Since your tasks are async (async def + await asyncio.sleep), ConcurrentTaskRunner is a better fit than ThreadPoolTaskRunner.
- Don’t use asyncio.run() inside a Prefect flow; just await your async functions from the flow.Kiran
09/24/2025, 10:15 AMMarvin
09/24/2025, 10:15 AMMarvin
09/24/2025, 10:18 AMprevious_state parameter is receiving the previous task’s RESULT (a string like PS_Extend_General_PSwaps done), not its State. That’s why your print shows the string and why calling .get_state() won’t work.
To inspect whether the upstream task completed or failed inside ps_task, pass the upstream future with allow_failure(...) as the argument value. That tells Prefect to deliver the upstream State object to your task argument instead of the resolved result. You’ll also want allow_failure in wait_for so Prefect doesn’t auto-cancel and you can decide in-task.
What to change in your flow submission loop
# Kick off the first task in this group
first_future = ps_task.with_options(task_run_name=group[0]).submit(group[0])
prev = first_future
# Chain the rest of the tasks in this group
for t in group[1:]:
prev = ps_task.with_options(task_run_name=t).submit(
t,
previous_state=allow_failure(prev), # <-- pass State into the task
wait_for=[allow_failure(prev)], # <-- don't auto-cancel; let task decide
)
How to check the upstream state in ps_task
from prefect import task
from prefect.states import Cancelled, get_state_result
@task(log_prints=True)
def ps_task(name, previous_state=None, delay=20):
if previous_state is not None:
# previous_state is a State when you pass allow_failure(prev)
print(f"previous_state: {previous_state!r}") # e.g. <State Completed()>
if not previous_state.is_completed():
return Cancelled(message="skipping due to upstream state")
# If you need the upstream result value:
upstream_value = get_state_result(previous_state)
print(f"Upstream result was: {upstream_value}")
print(f"Running {name}")
if name == "PS_MTM":
raise Exception("custom failure")
time.sleep(delay)
return f"{name} done"
Alternative: if you don’t need to inspect the upstream state (you just want to skip on failure), you can drop the previous_state parameter entirely and rely on wait_for=[prev] without allow_failure. Prefect will automatically cancel downstream tasks when an upstream dependency fails.
Refs:
- States and state checks (is_completed, is_failed, etc.): Prefect 3 States
- Getting a result from a State: get_state_resultKiran
09/24/2025, 10:29 AMMarvin
09/24/2025, 10:29 AMMarvin
09/24/2025, 10:30 AMps_task you saw previous_state is PS_Extend_General_PSwaps done — that’s the upstream task’s result string, not a State.
What’s happening step-by-step in your “previous code”:
- In the flow, prev is a PrefectFuture right after submit(). If you print it there, you’ll see a future object.
- When you later do ps_task(..., previous_state=prev), Prefect records a dependency and, at runtime, waits for prev to finish.
- If prev completes successfully (and you did not wrap with allow_failure), Prefect injects the upstream RESULT into the parameter — not the State — so previous_state becomes a string like "PS_Extend_General_PSwaps done".
- If prev fails (and you did not use allow_failure in wait_for or on the parameter), Prefect cancels the downstream task before it runs, so your task body won’t execute.
How to receive the State instead of the result
- Wrap the parameter value with allow_failure(prev). That tells Prefect “don’t resolve this to a plain result; give me the final State object.”
- If you want your task to still run even when the upstream failed, also use allow_failure(prev) in wait_for so the failure doesn’t auto-cancel the downstream.
Example
from prefect.utilities.annotations import allow_failure
from prefect.states import get_state_result
# Downstream submission
prev = ps_task.with_options(task_run_name=t).submit(
t,
previous_state=allow_failure(prev), # pass State object into the task
wait_for=[allow_failure(prev)], # don't auto-cancel on upstream failure
)
# Inside the task
@task(log_prints=True)
def ps_task(name, previous_state=None, delay=20):
if previous_state is not None:
print(type(previous_state), repr(previous_state)) # -> <class 'prefect.states.State'> <State Completed()>
if not previous_state.is_completed():
return Cancelled(message="skipping due to upstream failure")
upstream_value = get_state_result(previous_state) # recover the prior task's return value
print(f"Upstream result was: {upstream_value}")
print(f"Running {name}")
...
return f"{name} done"
Key points
- Passing a future as a normal argument → Prefect injects the resolved result by default.
- Passing allow_failure(future) as an argument → Prefect injects the final State object instead.
- Without allow_failure in wait_for, a failed upstream cancels the downstream before it runs; with it, the downstream runs and can inspect the upstream state.
Docs on states: Prefect 3 StatesKiran
09/24/2025, 10:47 AMMarvin
09/24/2025, 10:47 AMMarvin
09/24/2025, 10:47 AMallow_failure(...)
- Skips downstream tasks in a group if the upstream did not complete
- Runs the final group only if all prior groups completed successfully
from prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner
from prefect.utilities.annotations import allow_failure
from prefect.states import Cancelled, get_state_result
from prefect.futures import wait
import time
# -------------------------
# Tasks
# -------------------------
@task(log_prints=True)
def ps_task(name: str, previous_state=None, delay: int = 5):
# If we passed an upstream future using allow_failure(prev), previous_state is a State
if previous_state is not None:
print(f"[{name}] previous_state: {previous_state!r}")
# Only continue if upstream completed successfully
if not previous_state.is_completed():
return Cancelled(message=f"skipping {name} due to upstream not completed")
# If you need the upstream result value, extract it
upstream_value = get_state_result(previous_state)
print(f"[{name}] Upstream result was: {upstream_value}")
print(f"[{name}] Running")
if name == "PS_MTM":
# Example failure
raise Exception("custom failure")
time.sleep(delay)
return f"{name} done"
# -------------------------
# Helper to submit a sequential group
# -------------------------
def submit_group(group: list[str]):
"""
Submit a sequential chain of tasks and return the last future.
The first task is submitted normally; subsequent tasks receive the upstream State.
"""
prev = None
for i, name in enumerate(group):
if i == 0:
prev = ps_task.with_options(task_run_name=name).submit(name)
else:
prev = ps_task.with_options(task_run_name=name).submit(
name,
previous_state=allow_failure(prev), # pass State into the task
wait_for=[allow_failure(prev)], # don't auto-cancel; let task decide
)
return prev # last task in the group
# -------------------------
# Flow
# -------------------------
@flow(task_runner=ThreadPoolTaskRunner(max_workers=8), log_prints=True)
async def client_flow():
# Define groups; each inner list runs sequentially
client_tasks = [
["PS_Extend_General_PSwaps", "PS_TPLSweep_General_PSwaps"],
["PS_Generate", "PS_MTM", "PS_Sweeping"], # PS_MTM fails -> downstream in this group will be cancelled by our logic
["PS_LoanAccountSweeper"],
["PS_Reporting"], # final group runs only if all prior groups completed
]
# Submit non-final groups concurrently; each group itself is sequential
non_final_groups = client_tasks[:-1]
last_futures_of_prior_groups = [submit_group(g) for g in non_final_groups]
# Wait for the last task in each non-final group concurrently
prior_states = await wait(last_futures_of_prior_groups)
# Only run final group if all prior groups completed successfully
if all(s.is_completed() for s in prior_states):
final_last_future = submit_group(client_tasks[-1])
final_state = await final_last_future.wait()
return final_state.result() if final_state.is_completed() else None
else:
print("At least one prior group did not complete; skipping final group")
return None
if __name__ == "__main__":
client_flow()
Notes:
- Since ps_task is synchronous and uses time.sleep, this uses ThreadPoolTaskRunner for concurrency.
- If you convert ps_task to async (using async def and await asyncio.sleep), switch to ConcurrentTaskRunner instead.Kiran
09/24/2025, 10:55 AMMarvin
09/24/2025, 10:55 AMMarvin
09/24/2025, 10:57 AMprevious_state is not a Prefect State object — it’s already a resolved value (a string) or even an exception instance. Then previous_state.is_completed() raises an AttributeError.
Two fixes:
1) Make the downstream task robust to either a State or a resolved value
2) Keep using allow_failure(prev) on BOTH the parameter and wait_for so you normally receive a State and the task still runs even if upstream failed
Here’s a complete, defensive version that won’t crash when previous_state is not a State:
```
from prefect import flow, task
from prefect.task_runners import ConcurrentTaskRunner
import time
from prefect.utilities.annotations import allow_failure
from prefect.states import State, Cancelled, get_state_result
# -------------------------
# Define Tasks
# -------------------------
@task(log_prints=True)
def ps_task(name, previous_state=None, delay=20):
# Handle upstream info defensively: it might be a State, a resolved value, or even an exception
if previous_state is not None:
print(f"[{name}] previous_state: {previous_state!r} (type={type(previous_state)})")
if isinstance(previous_state, State):
# We received the upstream State (expected when using allow_failure on the param)
if not previous_state.is_completed():
return Cancelled(message=f"skipping {name}: upstream not completed")
# If you need the upstream result value:
try:
upstream_value = get_state_result(previous_state)
print(f"[{name}] upstream result: {upstream_value!r}")
except Exception as exc:
# If the upstream failed or its result cannot be read, decide what to do
return Cancelled(message=f"skipping {name}: could not read upstream result: {exc!r}")
else:
# We did not receive a State. Prefect resolved the value before passing it.
# At this point, it's already "successful" from Prefect's perspective.
# If it's an exception instance, treat it as failure; if it's a normal value, continue.
if isinstance(previous_state, BaseException):
return Cancelled(message=f"skipping {name}: upstream exception {previous_state!r}")
# else: proceed as success
print(f"Running {name}")
if name == "PS_MTM":
raise Exception("custom failure")
time.sleep(delay)
return f"{name} done"
# -------------------------
# Flow Definition
# -------------------------
@flow(task_runner=ConcurrentTaskRunner(), log_prints=True)
def client_flow():
client_tasks = [
["PS_Extend_General_PSwaps", "PS_TPLSweep_General_PSwaps"],
["PS_Generate", "PS_MTM", "PS_Sweeping"], # PS_MTM will fail
["PS_LoanAccountSweeper"],
"PS_Reporting", # final task
]
# Schedule all groups except the last one
last_futures_of_prior_groups = []
for group in client_tasks[ 1]
print(f"current group is {group}")
# First task in the group — starts immediately
prev = ps_task.with_options(task_run_name=group[0]).submit(group[0])
# Chain the rest of the tasks in this group
for t in group[1]
prev = ps_task.with_options(task_run_name=t).submit(
t,
previous_state=allow_failure(prev), # pass upstream State into the param
wait_for=[allow_failure(prev)], # don't auto-cancel; let the task decide
)
last_futures_of_prior_groups.append(prev)
# Schedule the last task to start only after all prior groups have settled
last_group = client_tasks[-1]
final_future = ps_task.with_options(task_run_name=last_group).submit(
last_group,
wait_for=last_futures_of_prior_groups, # here we just wait; we don't need the upstream states
)
# Wait for the entire flow to finish by resolving the last futureMarvin
09/24/2025, 10:57 AMallow_failure(prev) is used on the parameter, Prefect will pass the upstream State object to previous_state. We check isinstance(previous_state, State) and branch accordingly.
- If Prefect ever gives you a resolved value (e.g., a string), calling .is_completed() would fail — the defensive checks avoid that.
- Using allow_failure(prev) in wait_for ensures the downstream still runs so it can inspect the upstream State and decide to skip.
If you want the final group to run only if every prior group fully completed, you can also add a gate before submitting the last task by waiting on the prior futures and checking their states — but the above matches your current pattern.
Refs:
- States (and helpers like is_completed and get_state_result): Prefect 3 StatesKiran
09/29/2025, 9:38 AMMarvin
09/29/2025, 9:38 AMMarvin
09/29/2025, 9:40 AMprefect.states.get_state_result expects a State, not a PrefectFuture. If you have a future, either:
- call future.result() directly, or
- get its state with future.state and pass that to get_state_result.
Example:
from prefect.states import get_state_result
future = some_task.submit(1)
# Easiest:
value = future.result()
# Or via state:
state = future.state
value_from_state = get_state_result(state)
If you have nested structures of futures, prefect.futures.resolve_futures_to_results(expr) can resolve them in-place.
Source: prefect.states.get_state_result sourceKiran
09/29/2025, 11:21 AMMarvin
09/29/2025, 11:21 AMMarvin
09/29/2025, 11:24 AMfrom prefect.states import get_state_exception
if state.is_failed():
exc = get_state_exception(state) # optional: inspect the exception
print("Task failed:", exc)
- Or compare the type:
from prefect.client.schemas.objects import StateType
if state.type == StateType.FAILED:
...
For reference: prefect.states sourceKiran
09/29/2025, 11:29 AMfrom prefect.states import get_state_exception
if state.is_failed():
exc = get_state_exception(state) # optional: inspect the exception
print("Task failed:", exc)
i need tio check if the stask has reached athe final state namd failedMarvin
09/29/2025, 11:29 AMMarvin
09/29/2025, 11:30 AMfrom prefect.states import get_state_exception
future.wait() # blocks until the run reaches a final state
state = future.state
if state.is_failed():
exc = get_state_exception(state)
print("Task failed:", exc)
- Or, if you only have a state object and are polling/checking:
from prefect.client.schemas.objects import StateType
if state.is_final() and state.type == StateType.FAILED:
...
Note: state.is_failed() only returns True for the terminal Failed state. If the run is still Retrying/AwaitingRetry/etc., it won’t be considered failed yet. For reference: prefect.states sourceKiran
09/29/2025, 11:31 AMMarvin
09/29/2025, 11:32 AMMarvin
09/29/2025, 11:32 AMstate.is_failed() just returns False for a Running state (and for Pending, Retrying, etc.) and does not raise.
get_state_exception(state) will raise if the state is not Failed or Crashed — but in your snippet it’s only called when is_failed() is True, so you’re safe.
Example pattern:
from prefect.states import get_state_exception
if state.is_failed():
exc = get_state_exception(state)
print("Task failed:", exc)
elif not state.is_final():
print(f"Task not finished yet; current state: {state.name}")
Reference: prefect.states sourceKiran
09/29/2025, 12:12 PMMarvin
09/29/2025, 12:12 PMexplore_module_offerings
📊 Tools used:
• display_callable_signature (3x)
• explore_module_offerings (4x)
• research_prefect_topic (4x)Marvin
09/29/2025, 1:39 PMMarvin
09/29/2025, 1:44 PMPS_MTM) to demonstrate skipping/cancellation in that group and the final group.
```
from typing import List, Dict, Callable, Any
from prefect import flow, task, get_run_logger
from prefect.task_runners import ThreadPoolTaskRunner
# ---- Define your tasks (each accepts an optional "upstream" it ignores, used only to create UI edges) ----
@task
def PS_Extend_General_PSwaps(_upstream: Any = None):
get_run_logger().info("Extend general PSwaps")
return "extend-ok"
@task
def PS_TPLSweep_General_PSwaps(_upstream: Any = None):
get_run_logger().info("TPL sweep for general PSwaps")
return "tplsweep-ok"
@task
def PS_Generate(_upstream: Any = None):
get_run_logger().info("Generate")
return "generate-ok"
@task
def PS_MTM(_upstream: Any = None):
get_run_logger().info("MTM - going to fail for demo")
raise RuntimeError("MTM failed")
@task
def PS_Sweeping(_upstream: Any = None):
get_run_logger().info("Sweeping")
return "sweeping-ok"
@task
def PS_LoanAccountSweeper(_upstream: Any = None):
get_run_logger().info("Loan account sweep")
return "loan-sweeper-ok"
@task
def PS_Reporting(_upstream: Any = None):
get_run_logger().info("Final reporting")
return "reporting-ok"
# A simple barrier task that only exists to create a dependency on multiple upstreams
@task
def barrier(*_deps: Any) -> bool:
return True
# Registry to map your string names to task callables
TASKS: Dict[str, Callable[..., Any]] = {
"PS_Extend_General_PSwaps": PS_Extend_General_PSwaps,
"PS_TPLSweep_General_PSwaps": PS_TPLSweep_General_PSwaps,
"PS_Generate": PS_Generate,
"PS_MTM": PS_MTM, # This fails in this demo
"PS_Sweeping": PS_Sweeping,
"PS_LoanAccountSweeper": PS_LoanAccountSweeper,
"PS_Reporting": PS_Reporting,
}
@flow(name="client-orchestrator", task_runner=ThreadPoolTaskRunner())
def orchestrate(client_tasks: List[List[str]]):
"""
- All groups except the last run concurrently.
- Tasks inside each group run strictly in sequence (UI shows arrows).
- If a task fails in a group, Prefect cancels the remaining tasks in that group (grey 'Cancelled' in UI).
- The last group runs only if all prior groups completed successfully; otherwise it is cancelled.
"""
# Kick off all groups except the last in parallel
last_futures_of_prior_groups = []
for group in client_tasks[ 1]
prev = None
for idx, task_name in enumerate(group):
t = TASKS[task_name]
prev = t(prev) if idx > 0 else t() # chain on previous future to enforce order and create arrows
last_futures_of_prior_groups.append(prev)
# Create a barrier that waits on the last task of every prior group
barrier_future = barrier(*last_futures_of_prior_groups)Marvin
09/29/2025, 1:44 PMThreadPoolTaskRunner makes this explicit.
Follow-ups:
- Do you want the final group to run even if a prior group had a failure but you consider that acceptable? If so, we can wrap dependencies in prefect.allow_failure() selectively.
- Do any tasks need to pass real data to the next one? If yes, we’ll thread their outputs instead of the dummy _upstream argument above.Kiran
09/30/2025, 6:00 AMMarvin
09/30/2025, 6:00 AMMarvin
09/30/2025, 6:02 AMrun_step(name, ...) is reused for all steps
- Real work is implemented by plain Python handler functions (no extra @task)
- A “barrier” is just another call to the same run_step with name "BARRIER" that depends on all prior groups’ last steps
Note: In the UI, tasks that are not run because an upstream failed will show as Cancelled (grey). That’s Prefect’s canonical “skipped due to upstream failure.”
```
from typing import Any, Callable, Dict, List, Optional
from prefect import flow, task, get_run_logger
from prefect.task_runners import ThreadPoolTaskRunner
# ---- Plain Python handlers for your actual work (no @task here) ----
def do_PS_Extend_General_PSwaps(upstream: Any = None, **kwargs) -> Any:
get_run_logger().info("Extend general PSwaps")
return "extend-ok"
def do_PS_TPLSweep_General_PSwaps(upstream: Any = None, **kwargs) -> Any:
get_run_logger().info("TPL sweep for general PSwaps")
return "tplsweep-ok"
def do_PS_Generate(upstream: Any = None, **kwargs) -> Any:
get_run_logger().info("Generate")
return "generate-ok"
def do_PS_MTM(upstream: Any = None, **kwargs) -> Any:
get_run_logger().info("MTM - failing for demo")
raise RuntimeError("MTM failed")
def do_PS_Sweeping(upstream: Any = None, **kwargs) -> Any:
get_run_logger().info("Sweeping")
return "sweeping-ok"
def do_PS_LoanAccountSweeper(upstream: Any = None, **kwargs) -> Any:
get_run_logger().info("Loan account sweep")
return "loan-sweeper-ok"
def do_PS_Reporting(upstream: Any = None, **kwargs) -> Any:
get_run_logger().info("Final reporting")
return "reporting-ok"
def do_BARRIER(upstream: Any = None, deps: Optional[List[Any]] = None, **kwargs) -> bool:
# This exists just to depend on all prior groups' last steps.
# If any dep failed/cancelled, this task will be cancelled by Prefect.
get_run_logger().info("Barrier passed")
return True
# ---- Registry that maps names to handler functions ----
HANDLERS: Dict[str, Callable[..., Any]] = {
"PS_Extend_General_PSwaps": do_PS_Extend_General_PSwaps,
"PS_TPLSweep_General_PSwaps": do_PS_TPLSweep_General_PSwaps,
"PS_Generate": do_PS_Generate,
"PS_MTM": do_PS_MTM, # Fails in this demo
"PS_Sweeping": do_PS_Sweeping,
"PS_LoanAccountSweeper": do_PS_LoanAccountSweeper,
"PS_Reporting": do_PS_Reporting,
"BARRIER": do_BARRIER,
}
# ---- One task function reused for all steps ----
@task(task_run_name=lambda name, **_: f"{name}")
def run_step(
name: str,
upstream: Any = None, # used only to create a dependency arrow
deps: Optional[List[Any]] = None, # pass a list of upstream futures for fan-in dependencies (barrier)
params: Optional[Dict[str, Any]] = None,
) -> Any:
handler = HANDLERS.get(name)
if not handler:
raise ValueError(f"Unknown step name: {name}")
# Pass through upstream/deps/params for handlers that want them
return handler(upstream=upstream, deps=deps, **(params or {}))
@flow(name="client-orchestrator", task_runner=ThreadPoolTaskRunner())
def orchestrate(
client_tasks: List[List[str]],
params_by_step: Optional[Dict[str, Dict[str, Any]]] = None,
):
"""
- All groups except the last run concurrently.
- Steps inside each group run in strict sequence (UI shows arrows).
- If a step fails, remaining steps in that group are cancelled.
- Final group runs only if all prior groups complete successfully; otherwise it is cancelled.
"""
params_by_step = params_by_step or {}
# Start all groups except the last in parallel
last_futures = []
for group in client_tasks[ 1]Marvin
09/30/2025, 6:02 AM