https://prefect.io logo
c

Chris O'Brien

11/21/2019, 2:09 AM
Hi all, what’s the best way to implement a conditional on a Task Signal? Where if a task throws an exception that’s unhandled (so is FAIL) we pick one or another branch?
Copy code
@task
def condition():
  if previous_task.Signal == 'FAIL':
    return False
  else:
    return True

with Flow('test') as test:
    output = task_1() #Fails with Exception
    switch(condition, {True: yay_flow, False: boo_flow})
Or am I approaching this wrong?
c

Chris White

11/21/2019, 2:12 AM
There are a few ways you might do this; in keeping with your sketch above you could consider:
Copy code
from prefect import task, Flow
from prefect.triggers import any_failed

@task
def raise_error():
    raise ValueError()

@task(trigger=any_failed)
def condition(exc):
    if isinstance(exc, ValueError):
        return True
    else:
        return False

with Flow("example") as flow:
    cond = condition(raise_error)

flow_state = flow.run()
flow_state.result[cond].result # True
one of the cool things about prefect is that exceptions are stored as the result of your task, so downstream tasks can actually receive those exceptions and act on them
😍 1
You could also consider implementing a
state_handler
around the task that you expect to raise the signal; this state handler might look like:
Copy code
def state_handler(task, old, new):
    if new.is_failed() and isinstance(new.result, ValueError):
        return Success(result=True)
    else:
        return Success(result=False)
this pattern allows you to react / adjust your task’s state in a more localized way than the first, and would probably be better if you were running on Dask (since exceptions can sometimes fail to serialize)
👍 1
c

Chris O'Brien

11/21/2019, 4:38 AM
So I have been having a play and came across something I might be misinterpreting With a
some_failed(1,1)
I would have expected to have the task
cond
not trigger (as the
pre_error
should count as one and then the
raise_error
a second) unless I am not interpreting its usage correctly?
Copy code
from prefect import task, Flow
from prefect.triggers import any_failed, some_failed
from prefect.tasks.control_flow.conditional import ifelse, merge

@task
def pre_error():
    print("i pass")
    raise ValueError()

@task
def raise_error():
    raise ValueError()

@task(trigger=some_failed(1,2))
def condition(exc):
    if isinstance(exc, ValueError):
        return True
    else:
        return False
@task
def passed():
    print("yay")

@task
def failed():
    print("boo")

@task
def do_final_thing():
    print("final")

with Flow("example") as flow:
    raise_error.set_upstream(pre_error)
    ifelse(condition(raise_error), passed, failed)

    merged_result = merge(passed, failed)

    do_final_thing.set_upstream(merged_result)


flow_state = flow.run()
flow.visualize(flow_state=flow_state)
c

Chris White

11/21/2019, 3:37 PM
Hi @Chris O'Brien - so triggers only apply to direct upstream dependencies: if you want the behavior of
condition
to be affected by
pre_error
you’ll need to actually tell Prefect that you intend for it to be a dependency; in this case, you could do
condition(raise_error, upstream_tasks=[pre_error])
👍 1
2 Views