<@ULVA73B9P> from prefect import task,flow from p...
# ask-community
k
@Marvin from prefect import task,flow from prefect.states import Cancelled @task(log_prints=True) def skipping_task(): return Cancelled(message=f"skipping", name="SKIPPING") @flow(log_prints=True) def my_flow(): skipping_task() my_flow(), skipping is working here,
m
thought for 231.3 seconds
Nice — that pattern works in Prefect 3.x. Key points: - Prefect 3.x does not have a distinct “Skipped” state. If you want to indicate “did not run,” use
Cancelled
. - Returning a Prefect
State
from a task or flow sets that run’s final state. - You can use `name`/`message` on the state to make the reason clear in the UI. Examples - Skip a single task
Copy code
from prefect import task, flow
from prefect.states import Cancelled
from prefect.exceptions import CancelledRun

@task
def maybe_skip(do_skip: bool):
    if do_skip:
        return Cancelled(name="SKIPPED", message="Condition not met")
    return "ok"

@flow
def my_flow():
    try:
        result = maybe_skip(True)
    except CancelledRun:
        # Optional: swallow the cancellation so the flow can continue
        result = None
    print("continuing...")
- Skip the entire flow
Copy code
from prefect import flow
from prefect.states import Cancelled

@flow
def my_flow():
    return Cancelled(name="SKIPPED", message="Whole flow skipped")
- If you prefer the UI to stay green but still “no-op,” you can mark the task Completed with a message
Copy code
from prefect import task
from prefect.states import Completed

@task
def no_op_but_green():
    return Completed(name="SKIPPED", message="Intentionally skipped")
Docs: - State API in Prefect 3.x: prefect.states Quick check: Do you want downstream logic to continue when this task is “skipped,” and what state do you want the overall flow to end in (Completed vs Cancelled)? I can tailor the pattern accordingly.
k
@Marvin but why is from prefect import flow, task from prefect.task_runners import ConcurrentTaskRunner import time from prefect.utilities.annotations import allow_failure #from prefect.states import State, Cancelled, get_state_result from prefect.states import Cancelled @task(log_prints=True) def skipping_task(task_name): return Cancelled(message=f'skipping task {task_name}' , name="SKIPPING") # ------------------------- # Define Tasks # ------------------------- @task(log_prints=True) def ps_task(name, previous_task=None,previous_state=None, delay=20): print(f"previous task is {previous_task}") if previous_state is not None: if previous_task is not None: #if not "done" in previous_state: if previous_state!=f"{previous_task} done": print("reached") # return Cancelled(message=f"skipping {name}: upstream not completed", name="SKIPPING") #return Cancelled(message=f'skipping task {name}' , name="SKIPPING") skipping_task(name) #return Cancelled(message=f"skipping", name="SKIPPING") print(f"Running {name}") if name == "PS_MTM": raise Exception("custom failure") time.sleep(delay) return f"{name} done" # ------------------------- # Flow Definition # ------------------------- @flow(task_runner=ConcurrentTaskRunner(), log_prints=True) def client_flow(): client_tasks = [ ["PS_Extend_General_PSwaps", "PS_TPLSweep_General_PSwaps"], ["PS_Generate", "PS_MTM", "PS_Sweeping"], # PS_MTM will fail ["PS_LoanAccountSweeper"], "PS_Reporting", # final task ] # Schedule all groups except the last one last_futures_of_prior_groups = [] for group in client_tasks[ 1] print(f"current group is {group}") # First task in the group — starts immediately prev = ps_task.with_options(task_run_name=group[0]).submit(group[0]) # Chain the rest of the tasks in this group for t in group[1] current_task_index_in_group=group.index(t) previous_task=group[current_task_index_in_group-1] prev = ps_task.with_options(task_run_name=t).submit( t,previous_task=previous_task, previous_state=allow_failure(prev), # pass upstream State into the param wait_for=[allow_failure(prev)], # don't auto-cancel; let the task decide ) last_futures_of_prior_groups.append(prev) # Schedule the last task to start only after all prior groups have settled last_group = client_tasks[-1] final_future = ps_task.with_options(task_run_name=last_group).submit( last_group, wait_for=last_futures_of_prior_groups, # here we just wait; we don't need the upstream states ) # Wait for the entire flow to finish by resolving the last future return final_future.result() if name == "__main__": client_flow() tehe sam elogic not wroking here
m
thought for 813.5 seconds
Thanks for sharing the full snippet — a couple of things are causing the “same logic” not to work here: What’s going wrong - You’re calling
skipping_task(name)
inside
ps_task
but not returning its state. That creates a separate child task that goes Cancelled, while
ps_task
continues to run. To “skip” the current task, return a
Cancelled
state from the current task itself. -
previous_state
is a Prefect State (because of
allow_failure(prev)
), but you’re comparing it to a string like
f"{previous_task} done"
. Instead, inspect the state type (Completed/Failed/Cancelled) and, if needed, extract the result from the state. - For dependencies that should not auto-cancel downstream tasks, you must wrap them with
allow_failure
in both: - the parameter you pass (you did this), and - the
wait_for
list (you did this within the group, but not for the final task). Fixes 1) Return Cancelled from the current task (don’t call another task to skip) 2) Inspect upstream state correctly 3) Make the final
wait_for
soft if you want it to run regardless of prior failures Concrete changes - Update your task to check the upstream state and return Cancelled when you want to skip:
Copy code
from prefect import task
from prefect.states import Cancelled, get_state_result
from prefect.client.schemas.objects import StateType

@task(log_prints=True)
def ps_task(name, previous_task=None, previous_state=None, delay=20):
    print(f"previous task is {previous_task}")

    if previous_state is not None:
        # If upstream did not complete, skip this task
        if previous_state.type != StateType.COMPLETED:
            return Cancelled(
                name="SKIPPING",
                message=f"skipping {name}: upstream not completed; upstream state={previous_state.type}"
            )

        # Optional: check the actual upstream result if you care about its value
        upstream_result = get_state_result(previous_state, raise_on_failure=False)
        if upstream_result != f"{previous_task} done":
            return Cancelled(
                name="SKIPPING",
                message=f"skipping {name}: unexpected upstream result={upstream_result!r}"
            )

    print(f"Running {name}")
    if name == "PS_MTM":
        raise Exception("custom failure")
    import time; time.sleep(delay)
    return f"{name} done"
- Keep using
allow_failure(prev)
for both the parameter and the dependency within groups (you already do that) - If you want the final task to run even if earlier groups failed/cancelled, also mark those
wait_for
dependencies as allow-failure:
Copy code
final_future = ps_task.with_options(task_run_name=last_group).submit(
    last_group,
    wait_for=[allow_failure(f) for f in last_futures_of_prior_groups],
)
- If you don’t want the flow to error when the final task is Cancelled or Failed, wrap the resolution:
Copy code
from prefect.exceptions import CancelledRun, FailedRun

@flow(task_runner=ConcurrentTaskRunner(), log_prints=True)
def client_flow():
    # ... schedule all tasks as you have ...
    try:
        return final_future.result()
    except (CancelledRun, FailedRun) as exc:
        print(f"flow finishing despite final task state: {type(exc).__name__}")
        return None
Reference - State API (returning
Cancelled
, inspecting
state.type
, extracting results): prefect.states If you share how you want the final task to behave when earlier groups fail/cancel (run anyway vs auto-cancel), I can tailor the
wait_for
usage exactly.