@Marvin from prefect import task,flow from prefect.states import Cancelled @task(log_prints=True) def skipping_task(): return Cancelled(message=f"skipping", name="SKIPPING") @flow(log_prints=True) def my_flow(): skipping_task() my_flow() skipping is working here, but not working in this code from prefect import flow, task from prefect.task_runners import ConcurrentTaskRunner import time from prefect.utilities.annotations import allow_failure #from prefect.states import State, Cancelled, get_state_result from prefect.states import Cancelled @task(log_prints=True) def skipping_task(task_name): return Cancelled(message=f'skipping task {task_name}' , name="SKIPPING") # ------------------------- # Define Tasks # ------------------------- @task(log_prints=True) def ps_task(name, previous_task=None,previous_state=None, delay=20): # # Handle upstream info defensively: it might be a State, a resolved value, or even an exception # if previous_state is not None: # print(f"[{name}] previous_state: {previous_state!r} (type={type(previous_state)})") # if isinstance(previous_state, State): # # We received the upstream State (expected when using allow_failure on the param) # if not previous_state.is_completed(): # return Cancelled(message=f"skipping {name}: upstream not completed") # # If you need the upstream result value: # try: # upstream_value = get_state_result(previous_state) # print(f"[{name}] upstream result: {upstream_value!r}") # except Exception as exc: # # If the upstream failed or its result cannot be read, decide what to do # return Cancelled(message=f"skipping {name}: could not read upstream result: {exc!r}") # else: # # We did not receive a State. Prefect resolved the value before passing it. # # At this point, it's already "successful" from Prefect's perspective. # # If it's an exception instance, treat it as failure; if it's a normal value, continue. # if isinstance(previous_state, BaseException): # return Cancelled(message=f"skipping {name}: upstream exception {previous_state!r}") # else: proceed as success print(f"previous task is {previous_task}") if previous_state is not None: if previous_task is not None: #if not "done" in previous_state: if previous_state!=f"{previous_task} done": print("reached") # return Cancelled(message=f"skipping {name}: upstream not completed", name="SKIPPING") #return Cancelled(message=f'skipping task {name}' , name="SKIPPING") #skipping_task(name) return Cancelled(message=f"skipping", name="SKIPPING") print(f"Running {name}") if name == "PS_MTM": raise Exception("custom failure") time.sleep(delay) return f"{name} done" # ------------------------- # Flow Definition # ------------------------- @flow(task_runner=ConcurrentTaskRunner(), log_prints=True) def client_flow(): client_tasks = [ ["PS_Extend_General_PSwaps", "PS_TPLSweep_General_PSwaps"], ["PS_Generate", "PS_MTM", "PS_Sweeping"], # PS_MTM will fail ["PS_LoanAccountSweeper"], "PS_Reporting", # final task ] # Schedule all groups except the last one last_futures_of_prior_groups = [] for group in client_tasks[ 1] print(f"current group is {group}") # First task in the group — starts immediately prev = ps_task.with_options(task_run_name=group[0]).submit(group[0]) # Chain the rest of the tasks in this group for t in group[1] current_task_index_in_group=group.index(t) previous_task=group[current_task_index_in_group-1] prev = ps_task.with_options(task_run_name=t).submit( t,previous_task=previous_task, previous_state=allow_failure(prev), # pass upstream State into the param wait_for=[allow_failure(prev)], # don't auto-cancel; let the task decide ) last_futures_of_prior_groups.append(prev) # Schedule the last task to start only after all prior groups have settled last_group = client_tasks[-1] final_future = ps_task.with_options(task_run_name=last_group).submit( last_group, wait_for=last_futures_of_prior_groups, # here we just wait; we don't need the upstream states ) # Wait for the entire flow to finish by resolving the last future return final_future.result() if name == "__main__": client_flow() why