Chris O'Brien
11/21/2019, 2:09 AM@task
def condition():
if previous_task.Signal == 'FAIL':
return False
else:
return True
with Flow('test') as test:
output = task_1() #Fails with Exception
switch(condition, {True: yay_flow, False: boo_flow})
Or am I approaching this wrong?Chris White
11/21/2019, 2:12 AMfrom prefect import task, Flow
from prefect.triggers import any_failed
@task
def raise_error():
raise ValueError()
@task(trigger=any_failed)
def condition(exc):
if isinstance(exc, ValueError):
return True
else:
return False
with Flow("example") as flow:
cond = condition(raise_error)
flow_state = flow.run()
flow_state.result[cond].result # True
state_handler
around the task that you expect to raise the signal; this state handler might look like:
def state_handler(task, old, new):
if new.is_failed() and isinstance(new.result, ValueError):
return Success(result=True)
else:
return Success(result=False)
Chris O'Brien
11/21/2019, 4:38 AMsome_failed(1,1)
I would have expected to have the task cond
not trigger (as the pre_error
should count as one and then the raise_error
a second) unless I am not interpreting its usage correctly?
from prefect import task, Flow
from prefect.triggers import any_failed, some_failed
from prefect.tasks.control_flow.conditional import ifelse, merge
@task
def pre_error():
print("i pass")
raise ValueError()
@task
def raise_error():
raise ValueError()
@task(trigger=some_failed(1,2))
def condition(exc):
if isinstance(exc, ValueError):
return True
else:
return False
@task
def passed():
print("yay")
@task
def failed():
print("boo")
@task
def do_final_thing():
print("final")
with Flow("example") as flow:
raise_error.set_upstream(pre_error)
ifelse(condition(raise_error), passed, failed)
merged_result = merge(passed, failed)
do_final_thing.set_upstream(merged_result)
flow_state = flow.run()
flow.visualize(flow_state=flow_state)
Chris White
11/21/2019, 3:37 PMcondition
to be affected by pre_error
you’ll need to actually tell Prefect that you intend for it to be a dependency; in this case, you could do condition(raise_error, upstream_tasks=[pre_error])