Hello, I wonder how a task could process the resul...
# ask-community
h
Hello, I wonder how a task could process the result of a failed upstream task. Please see the example below, I have a
task_a
and then I have a
task_b
which takes the
task_a()
as upstream task.
task_b
always runs and process the task_a result even when the task_a fails
Copy code
def task_a(threshold: float):
  result = random()
  if result > threshold:
   return result
  else:
    # question 1: how do I get task_a to return the result of random() even when the FAIL signal is raised. 
    raise signals.FAIL("failed, result is less than the threshold")


@task(trigger=triggers.always_run)
def task_b(result_a: float):
  # question 2: how could task b check if the task_a is a success or failure? 
  if result_a.is_failed():   # ???
    print ('task_a failed with result {}'.format(result_a))
  else:
    print ('task_a suceeds with result {}'.format(result_a))


with Flow('task_b always handles task_a failure') as flow:
  result_a = task_a(0.3)
  result_b = task_b(result_a)
I have two questions: 1. how do I get
task_a
to return the result of
random()
even when the
FAIL
signal is raised. 2. how do
task_b
check if the
task_a
is a success or failure?
c
Hi Hui, 1. You can specify a
result
in the
FAIL
signal:
FAIL("message", result=result)
2. State checks always need to occur in either the state handler for the task, the trigger for the task, or you should create a conditional branch of tasks here that respond to the states individually. In general, if you want different logic to run based on state, you should encapsulate that in the structure of the Flow, not within an individual task.
h
thank you, Chris.
👍 1
Could you give me an example of how to state check in the trigger for task?
c
Sure:
Copy code
def my_trigger(upstream_states: dict) -> bool:
    for edge, state in upstream_states.items():
        if edge.key == "result_a":
            if state.is_failed(): # etc
    ...
    return True

@task(trigger=my_trigger)
def task_b(result_a):
…
Note that triggers are responsible for one thing: determining whether the task should run or not, so they might not be what you’re looking for
h
I got it, yes, it might not be what I am looking for but thank you for showing.
Hi @Chris White, could I change the result of the task in its state handler? For example
Copy code
def return_error_if_trigger_failed(task, old_state, new_state):
    if isinstance(new_state, state.TriggerFailed):
        new_state.result = {'status':"ERROR"}
    return new_state

task_result = ParseDBTLogTask(state_handlers=[return_error_if_trigger_failed])
Hi Chris, The reason I want to handle both success and failure of task_a in one task_b, it’s because there are common logic in handling both cases. Creating a conditional branch of tasks to handle different outcome of task_a seems redundant and over-complicated. I found a solution by checking the FAIL of task_a in task_b. Not sure if it violates any rules or conflict with best practice, but it works for me for now.
Copy code
def task_a(threshold: float):
  result = random()
  if result > threshold:
   return result
  else:
    raise signals.FAIL("failed, result is less than the threshold", result=result)

@task(trigger=triggers.always_run)
def task_b(result_a: float):
  if if isinstance(result_a, signals.FAIL): #   # answer 2: check if task a fails
    print ('task_a failed with result {}'.format(result_a))
  else:
    print ('task_a suceeds with result {}'.format(result_a))
with Flow('task_b always handles task_a failure') as flow:
  result_a = task_a(0.3)
  result_b = task_b(result_a)
@Dylan @Kyle Moon-Wright thanks for chatting