Kiran
09/24/2025, 2:07 PMMarvin
09/24/2025, 2:07 PMMarvin
09/24/2025, 2:11 PMCancelled.
- Returning a Prefect State from a task or flow sets that run’s final state.
- You can use `name`/`message` on the state to make the reason clear in the UI.
Examples
- Skip a single task
from prefect import task, flow
from prefect.states import Cancelled
from prefect.exceptions import CancelledRun
@task
def maybe_skip(do_skip: bool):
    if do_skip:
        return Cancelled(name="SKIPPED", message="Condition not met")
    return "ok"
@flow
def my_flow():
    try:
        result = maybe_skip(True)
    except CancelledRun:
        # Optional: swallow the cancellation so the flow can continue
        result = None
    print("continuing...")
- Skip the entire flow
from prefect import flow
from prefect.states import Cancelled
@flow
def my_flow():
    return Cancelled(name="SKIPPED", message="Whole flow skipped")
- If you prefer the UI to stay green but still “no-op,” you can mark the task Completed with a message
from prefect import task
from prefect.states import Completed
@task
def no_op_but_green():
    return Completed(name="SKIPPED", message="Intentionally skipped")
Docs:
- State API in Prefect 3.x: prefect.states
Quick check: Do you want downstream logic to continue when this task is “skipped,” and what state do you want the overall flow to end in (Completed vs Cancelled)? I can tailor the pattern accordingly.Kiran
09/24/2025, 2:13 PMMarvin
09/24/2025, 2:13 PMMarvin
09/24/2025, 2:26 PMskipping_task(name) inside ps_task but not returning its state. That creates a separate child task that goes Cancelled, while ps_task continues to run. To “skip” the current task, return a Cancelled state from the current task itself.
- previous_state is a Prefect State (because of allow_failure(prev)), but you’re comparing it to a string like f"{previous_task} done". Instead, inspect the state type (Completed/Failed/Cancelled) and, if needed, extract the result from the state.
- For dependencies that should not auto-cancel downstream tasks, you must wrap them with allow_failure in both:
  - the parameter you pass (you did this), and
  - the wait_for list (you did this within the group, but not for the final task).
Fixes
1) Return Cancelled from the current task (don’t call another task to skip)
2) Inspect upstream state correctly
3) Make the final wait_for soft if you want it to run regardless of prior failures
Concrete changes
- Update your task to check the upstream state and return Cancelled when you want to skip:
from prefect import task
from prefect.states import Cancelled, get_state_result
from prefect.client.schemas.objects import StateType
@task(log_prints=True)
def ps_task(name, previous_task=None, previous_state=None, delay=20):
    print(f"previous task is {previous_task}")
    if previous_state is not None:
        # If upstream did not complete, skip this task
        if previous_state.type != StateType.COMPLETED:
            return Cancelled(
                name="SKIPPING",
                message=f"skipping {name}: upstream not completed; upstream state={previous_state.type}"
            )
        # Optional: check the actual upstream result if you care about its value
        upstream_result = get_state_result(previous_state, raise_on_failure=False)
        if upstream_result != f"{previous_task} done":
            return Cancelled(
                name="SKIPPING",
                message=f"skipping {name}: unexpected upstream result={upstream_result!r}"
            )
    print(f"Running {name}")
    if name == "PS_MTM":
        raise Exception("custom failure")
    import time; time.sleep(delay)
    return f"{name} done"
- Keep using allow_failure(prev) for both the parameter and the dependency within groups (you already do that)
- If you want the final task to run even if earlier groups failed/cancelled, also mark those wait_for dependencies as allow-failure:
final_future = ps_task.with_options(task_run_name=last_group).submit(
    last_group,
    wait_for=[allow_failure(f) for f in last_futures_of_prior_groups],
)
- If you don’t want the flow to error when the final task is Cancelled or Failed, wrap the resolution:
from prefect.exceptions import CancelledRun, FailedRun
@flow(task_runner=ConcurrentTaskRunner(), log_prints=True)
def client_flow():
    # ... schedule all tasks as you have ...
    try:
        return final_future.result()
    except (CancelledRun, FailedRun) as exc:
        print(f"flow finishing despite final task state: {type(exc).__name__}")
        return None
Reference
- State API (returning Cancelled, inspecting state.type, extracting results): prefect.states
If you share how you want the final task to behave when earlier groups fail/cancel (run anyway vs auto-cancel), I can tailor the wait_for usage exactly.