Hui Zheng
11/04/2020, 7:15 PMtask_a
and then I have a task_b
which takes the task_a()
as upstream task. task_b
always runs and process the task_a result even when the task_a fails
def task_a(threshold: float):
result = random()
if result > threshold:
return result
else:
# question 1: how do I get task_a to return the result of random() even when the FAIL signal is raised.
raise signals.FAIL("failed, result is less than the threshold")
@task(trigger=triggers.always_run)
def task_b(result_a: float):
# question 2: how could task b check if the task_a is a success or failure?
if result_a.is_failed(): # ???
print ('task_a failed with result {}'.format(result_a))
else:
print ('task_a suceeds with result {}'.format(result_a))
with Flow('task_b always handles task_a failure') as flow:
result_a = task_a(0.3)
result_b = task_b(result_a)
I have two questions:
1. how do I get task_a
to return the result of random()
even when the FAIL
signal is raised.
2. how do task_b
check if the task_a
is a success or failure?Chris White
result
in the FAIL
signal: FAIL("message", result=result)
2. State checks always need to occur in either the state handler for the task, the trigger for the task, or you should create a conditional branch of tasks here that respond to the states individually. In general, if you want different logic to run based on state, you should encapsulate that in the structure of the Flow, not within an individual task.Hui Zheng
11/04/2020, 7:39 PMHui Zheng
11/04/2020, 7:40 PMChris White
def my_trigger(upstream_states: dict) -> bool:
for edge, state in upstream_states.items():
if edge.key == "result_a":
if state.is_failed(): # etc
...
return True
@task(trigger=my_trigger)
def task_b(result_a):
…
Note that triggers are responsible for one thing: determining whether the task should run or not, so they might not be what you’re looking forHui Zheng
11/05/2020, 1:05 AMHui Zheng
11/10/2020, 12:28 AMdef return_error_if_trigger_failed(task, old_state, new_state):
if isinstance(new_state, state.TriggerFailed):
new_state.result = {'status':"ERROR"}
return new_state
task_result = ParseDBTLogTask(state_handlers=[return_error_if_trigger_failed])
Hui Zheng
11/10/2020, 2:15 AMdef task_a(threshold: float):
result = random()
if result > threshold:
return result
else:
raise signals.FAIL("failed, result is less than the threshold", result=result)
@task(trigger=triggers.always_run)
def task_b(result_a: float):
if if isinstance(result_a, signals.FAIL): # # answer 2: check if task a fails
print ('task_a failed with result {}'.format(result_a))
else:
print ('task_a suceeds with result {}'.format(result_a))
with Flow('task_b always handles task_a failure') as flow:
result_a = task_a(0.3)
result_b = task_b(result_a)
Hui Zheng
11/10/2020, 10:20 PM