<@ULVA73B9P> from prefect import task,flow from...
# ask-community
k
@Marvin from prefect import task,flow from prefect.states import Cancelled @task(log_prints=True) def skipping_task(): return Cancelled(message=f"skipping", name="SKIPPING") @flow(log_prints=True) def my_flow(): skipping_task() my_flow() skipping is working here, but not working in this code from prefect import flow, task from prefect.task_runners import ConcurrentTaskRunner import time from prefect.utilities.annotations import allow_failure #from prefect.states import State, Cancelled, get_state_result from prefect.states import Cancelled @task(log_prints=True) def skipping_task(task_name): return Cancelled(message=f'skipping task {task_name}' , name="SKIPPING") # ------------------------- # Define Tasks # ------------------------- @task(log_prints=True) def ps_task(name, previous_task=None,previous_state=None, delay=20): # # Handle upstream info defensively: it might be a State, a resolved value, or even an exception # if previous_state is not None: # print(f"[{name}] previous_state: {previous_state!r} (type={type(previous_state)})") # if isinstance(previous_state, State): # # We received the upstream State (expected when using allow_failure on the param) # if not previous_state.is_completed(): # return Cancelled(message=f"skipping {name}: upstream not completed") # # If you need the upstream result value: # try: # upstream_value = get_state_result(previous_state) # print(f"[{name}] upstream result: {upstream_value!r}") # except Exception as exc: # # If the upstream failed or its result cannot be read, decide what to do # return Cancelled(message=f"skipping {name}: could not read upstream result: {exc!r}") # else: # # We did not receive a State. Prefect resolved the value before passing it. # # At this point, it's already "successful" from Prefect's perspective. # # If it's an exception instance, treat it as failure; if it's a normal value, continue. # if isinstance(previous_state, BaseException): # return Cancelled(message=f"skipping {name}: upstream exception {previous_state!r}") # else: proceed as success print(f"previous task is {previous_task}") if previous_state is not None: if previous_task is not None: #if not "done" in previous_state: if previous_state!=f"{previous_task} done": print("reached") # return Cancelled(message=f"skipping {name}: upstream not completed", name="SKIPPING") #return Cancelled(message=f'skipping task {name}' , name="SKIPPING") #skipping_task(name) return Cancelled(message=f"skipping", name="SKIPPING") print(f"Running {name}") if name == "PS_MTM": raise Exception("custom failure") time.sleep(delay) return f"{name} done" # ------------------------- # Flow Definition # ------------------------- @flow(task_runner=ConcurrentTaskRunner(), log_prints=True) def client_flow(): client_tasks = [ ["PS_Extend_General_PSwaps", "PS_TPLSweep_General_PSwaps"], ["PS_Generate", "PS_MTM", "PS_Sweeping"], # PS_MTM will fail ["PS_LoanAccountSweeper"], "PS_Reporting", # final task ] # Schedule all groups except the last one last_futures_of_prior_groups = [] for group in client_tasks[ 1] print(f"current group is {group}") # First task in the group — starts immediately prev = ps_task.with_options(task_run_name=group[0]).submit(group[0]) # Chain the rest of the tasks in this group for t in group[1] current_task_index_in_group=group.index(t) previous_task=group[current_task_index_in_group-1] prev = ps_task.with_options(task_run_name=t).submit( t,previous_task=previous_task, previous_state=allow_failure(prev), # pass upstream State into the param wait_for=[allow_failure(prev)], # don't auto-cancel; let the task decide ) last_futures_of_prior_groups.append(prev) # Schedule the last task to start only after all prior groups have settled last_group = client_tasks[-1] final_future = ps_task.with_options(task_run_name=last_group).submit( last_group, wait_for=last_futures_of_prior_groups, # here we just wait; we don't need the upstream states ) # Wait for the entire flow to finish by resolving the last future return final_future.result() if name == "__main__": client_flow() why
m
Your message was too long, here's your message at the allowed limit: from prefect import task flow from prefect states import Cancelled @task(log_prints=True) def skipping_task() return Cancelled(message=f skipping name= SKIPPING ) @flow(log_prints=True) def my_flow() skipping_task() my_flow() skipping is working here but not working in this code from prefect import flow task from prefect task_runners import ConcurrentTaskRunner import time from prefect utilities annotations import allow_failure #from prefect states import State Cancelled get_state_result from prefect states import Cancelled @task(log_prints=True) def skipping_task(task_name) return Cancelled(message=f'skipping task {task_name}' name= SKIPPING ) # ------------------------- # Define Tasks # ------------------------- @task(log_prints=True) def ps_task(name previous_task=None previous_state=None delay=20) # # Handle upstream info defensively it might be a State a resolved value or even an exception # if previous_state is not None # print(f [{name}] previous_state {previous_state!r} (type={type(previous_state)}) ) # if isinstance(previous_state State) # # We received the upstream State (expected when using allow_failure on the param) # if not previous_state is_completed() # return Cancelled(message=f skipping {name} upstream not completed ) # # If you need the upstream result value # try # upstream_value = get_state_result(previous_state) # print(f [{name}] upstream result {upstream_value!r} ) # except Exception as exc # # If the upstream failed or its result cannot be read decide what to do # return Cancelled(message=f skipping {name} could not read upstream result {exc!r} ) # else # # We did not receive a State Prefect resolved the value before passing it # # At this point it's already successful from Prefect's perspective # # If it's an exception instance treat it as failure; if it's a normal value continue # if isinstance(previous_state BaseException) # return Cancelled(message=f skipping {name} upstream exception {previous_state!r} ) # else proceed as success print(f previous task is {previous_task} ) if previous_state is not None if previous_task is not None #if not done in previous_state if previous_state!=f {previous_task} done print( reached ) # return Cancelled(message=f skipping {name} upstream not completed name= SKIPPING ) #return Cancelled(message=f'skipping task {name}' name= SKIPPING ) #skipping_task(name) return Cancelled(message=f skipping name= SKIPPING ) print(f Running {name} ) if name == PS_MTM raise Exception( custom failure ) time sleep(delay) return f {name} done # ------------------------- # Flow Definition # ------------------------- @flow(task_runner=ConcurrentTaskRunner() log_prints=True) def client_flow() client_tasks = [ [ PS_Extend_General_PSwaps PS_TPLSweep_General_PSwaps ] [ PS_Generate PS_MTM PS_Sweeping ] # PS_MTM will fail [ PS_LoanAccountSweeper ] PS_Reporting # final task ] # Schedule all groups except the last one last_futures_of_prior_groups = [] for group in client_tasks[ -1] print(f current group is {group} ) # First task in the group — starts immediately prev = ps_task with_options(task_run_name=group[0]) submit(group[0]) # Chain the rest of the tasks in this group for t in group[1 ] current_task_index_in_group=group index(t) previous_task=group[current_task_index_in_group-1] prev = ps_task with_options(task_run_name=t) submit( t previous_task=previous_task previous_state=allow_failure(prev) # pass upstream State into the param wait_for=[allow_failure(prev)] # don't auto-cancel; let the task decide ) last_futures_of_prior_groups append(prev) # Schedule the last task to start only after all prior groups have settled last_group = client_tasks[-1] final_future = ps_task with_options(task_run_name=last_group) submit( last_group wait_for=last_futures_of_prior_groups # here we just wait; we don't need the upstream states ) # Wait for the entire flow to finish by