c

    Chris O'Brien

    2 years ago
    Hi all, what’s the best way to implement a conditional on a Task Signal? Where if a task throws an exception that’s unhandled (so is FAIL) we pick one or another branch?
    @task
    def condition():
      if previous_task.Signal == 'FAIL':
        return False
      else:
        return True
    
    with Flow('test') as test:
        output = task_1() #Fails with Exception
        switch(condition, {True: yay_flow, False: boo_flow})
    Or am I approaching this wrong?
    Chris White

    Chris White

    2 years ago
    There are a few ways you might do this; in keeping with your sketch above you could consider:
    from prefect import task, Flow
    from prefect.triggers import any_failed
    
    @task
    def raise_error():
        raise ValueError()
    
    @task(trigger=any_failed)
    def condition(exc):
        if isinstance(exc, ValueError):
            return True
        else:
            return False
    
    with Flow("example") as flow:
        cond = condition(raise_error)
    
    flow_state = flow.run()
    flow_state.result[cond].result # True
    one of the cool things about prefect is that exceptions are stored as the result of your task, so downstream tasks can actually receive those exceptions and act on them
    You could also consider implementing a
    state_handler
    around the task that you expect to raise the signal; this state handler might look like:
    def state_handler(task, old, new):
        if new.is_failed() and isinstance(new.result, ValueError):
            return Success(result=True)
        else:
            return Success(result=False)
    this pattern allows you to react / adjust your task’s state in a more localized way than the first, and would probably be better if you were running on Dask (since exceptions can sometimes fail to serialize)
    c

    Chris O'Brien

    2 years ago
    So I have been having a play and came across something I might be misinterpreting With a
    some_failed(1,1)
    I would have expected to have the task
    cond
    not trigger (as the
    pre_error
    should count as one and then the
    raise_error
    a second) unless I am not interpreting its usage correctly?
    from prefect import task, Flow
    from prefect.triggers import any_failed, some_failed
    from prefect.tasks.control_flow.conditional import ifelse, merge
    
    @task
    def pre_error():
        print("i pass")
        raise ValueError()
    
    @task
    def raise_error():
        raise ValueError()
    
    @task(trigger=some_failed(1,2))
    def condition(exc):
        if isinstance(exc, ValueError):
            return True
        else:
            return False
    @task
    def passed():
        print("yay")
    
    @task
    def failed():
        print("boo")
    
    @task
    def do_final_thing():
        print("final")
    
    with Flow("example") as flow:
        raise_error.set_upstream(pre_error)
        ifelse(condition(raise_error), passed, failed)
    
        merged_result = merge(passed, failed)
    
        do_final_thing.set_upstream(merged_result)
    
    
    flow_state = flow.run()
    flow.visualize(flow_state=flow_state)
    Chris White

    Chris White

    2 years ago
    Hi @Chris O'Brien - so triggers only apply to direct upstream dependencies: if you want the behavior of
    condition
    to be affected by
    pre_error
    you’ll need to actually tell Prefect that you intend for it to be a dependency; in this case, you could do
    condition(raise_error, upstream_tasks=[pre_error])