https://prefect.io logo
Title
p

Philip MacMenamin

09/26/2022, 3:19 PM
Hi - for prefect v1 I have a task that fails some of the time. I'd like to have the final flow state determined as success if any of the mapped tasks are a success, and fail if all of them are failed. eg
@task(name="Task A")
def task_a(val):
     if random.random() > 0.5:
        raise ValueError(f"Non-deterministic error has occurred.{val}")

@task
def send_log(val):
    <http://logger.info|logger.info>(f"hello {val}")

l = [1,2,3,4]
with Flow("Trigger example") as flow:
    ta_s = task_a.map(val=l)
    sl_s = send_log.map(val=l, upstream_tasks=[ta_s])
# flow run should be success if ANY of task_a's succeeded. 
flow.set_reference_tasks([something])
m

Mason Menges

09/26/2022, 3:29 PM
Hey @Philip MacMenamin in prefect 1 I believe you should be able to accomplish this with a state handler https://docs-v1.prefect.io/core/concepts/flows.html#terminal-state-handlers, the example in the docs is updating the message but there's no reason you couldn't mark the task as skipped with signals or raise another completed state. I think you could also accomplish this with a try/except block in the task itself.
p

Philip MacMenamin

09/26/2022, 5:52 PM
Hi Mason, something like this?
import random
from typing import Set, Optional
from prefect.engine.state import State
import prefect
from prefect.engine import state

from prefect.triggers import all_successful, all_failed, any_failed, always_run, any_successful
from prefect import task, Flow

logger = prefect.context.get("logger")

@task(name="Task A")
def task_a(val):
      if random.random() > 0.5:
        raise ValueError(f"Non-deterministic error has occured.{val}")

@task(name="Task B", trigger=any_successful)
def cleanup_workdir(val):
    <http://logger.info|logger.info>(f"someghing worked, {val}")

def custom_terminal_state_handler(
    flow: Flow,
    state: State,
    reference_task_states: Set[State],
) -> Optional[State]:
    success = False
    # iterate through reference task states looking for failures
    for task_state in reference_task_states:
        if not task_state.is_failed():
            success = True
    # update the terminal state of the Flow and return
    if not success:
        state.message = "Some important tasks have failed"
    return state



l = [1,2,3,4]
with Flow("Trigger example", terminal_state_handler=custom_terminal_state_handler) as flow:
    ta_s = task_a.map(val=l)
    cleanups = cleanup_workdir.map(val=l, upstream_tasks=[ta_s])

# flow.set_reference_tasks([ps])

flow.run()
I guess I want to mutate that returned state status to FAILED? in the custom handler?
or create a new State, set it as Failed or Success manually, and return that?
m

Mason Menges

09/26/2022, 6:09 PM
You'd mutate it in the in the customer handler and return the final state. You can also likely accomplish this with triggers, specifically the any_successful trigger https://docs-v1.prefect.io/api/latest/triggers.html#functions, you'd just define a final task that triggers if any of the upstream tasks were successful, in this case you'd define the mapped task as the upstream task. This discourse article gives a basic example, it also outlines how this could be accomplished in 2.0. You'd then set the reference task to be the any_successful task. either way should work. https://discourse.prefect.io/t/how-can-i-control-the-final-state-of-a-flow-run/56
p

Philip MacMenamin

09/26/2022, 6:11 PM
I have been trying to achieve this with triggers, using the above and various examples, but have not been able to find something where a mapped upstream task can have a single success, and this can be used to define the reference state as Success.
how would I mutate the state to Failed or Success in the above example?
any ideas @Mason Menges ?
m

Mason Menges

09/26/2022, 7:06 PM
Hey Philip, mapping the reference task when using triggers looks like that was causing issues this example works for me, i.e. is successful if any mapped task is successful otherwise it fails.
import random
from typing import Set
import prefect

from prefect.triggers import any_successful
from prefect import task, Flow

logger = prefect.context.get("logger")

@task(name="Task A")
def task_a(val):
      if random.random() > 0.5:
        raise ValueError(f"Non-deterministic error has occured.{val}")

@task(name="Task B", trigger=any_successful)
def cleanup_workdir():
    <http://logger.info|logger.info>(f"someghing worked")


with Flow("Trigger example") as flow:
    l = [1,2,3,4]
    ta_s = task_a.map(val=l)
    cleanups = cleanup_workdir(upstream_tasks=[ta_s])

    flow.set_reference_tasks([cleanups])

flow.run()
p

Philip MacMenamin

09/26/2022, 7:12 PM
right, it works if you don't map the downstream task, but I am. eg
import random
from typing import Set
import prefect

from prefect.triggers import any_successful
from prefect import task, Flow

logger = prefect.context.get("logger")

@task(name="Task A")
def task_a(val):
      # if random.random() > 0.5:
        raise ValueError(f"Non-deterministic error has occured.{val}")

@task(name="Task B", trigger=any_successful)
def cleanup_workdir(val):
    <http://logger.info|logger.info>(f"someghing worked {val}")


with Flow("Trigger example") as flow:
    l = [1,2,3,4]
    ta_s = task_a.map(val=l)
    cleanups = cleanup_workdir.map(val=l, upstream_tasks=[ta_s])

    flow.set_reference_tasks([cleanups])

flow.run()
this example gets defined as Success, despite the task_a func always failing.
m

Mason Menges

09/26/2022, 7:17 PM
Is it necessary to map the downstream task for your use case? If some actions need to be performed on a failure you could use a try except block in the mapped task to perform those actions then raise the exception you're seeing,
p

Philip MacMenamin

09/26/2022, 7:38 PM
so basically i need to find a downstream task that isn't mapped and use that as reference?
would there be any way to mutate the state of a State object?
as in use the custom state handler?
m

Mason Menges

09/26/2022, 7:49 PM
Essentially yes, you could have some final task like, Final_State_Check, which only triggers if at least some of the upstream tasks were successful and set that as the reference task. You can technically return a State in the customer state handler
from prefect.engine.state import State, Success, Failed

state = Success(message="Successful", context=flow.context)
though this could lead to some unintended consequences so I'd generally suggest the former for most use cases
p

Philip MacMenamin

09/26/2022, 7:51 PM
but that Final_State_Check can't be mapped?
it would be just a final func hung onto the back of the workflow?
m

Mason Menges

09/26/2022, 7:59 PM
As best I can tell yes, it looks like what's occurring when that task is mapped in this context is it creates a loop where one of the mapped tasks for the final function is considered Successful, so it always ends up succeeding, I'd have to dig in a bit more to determine why this might be occurring but that's what it looks like when I tested it locally. Task handling like this is part of why we made some of the changes we did to Prefect 2 since it's much easier to implement custom logic around these kinds of workflows in native python rather than forcing these particular cases through triggers/statehandlers, etc.
p

Philip MacMenamin

09/26/2022, 8:08 PM
I'll migrate to v2 ASAP, but I need to get this out the door. I'm looking at trying to hack together a state handler, how do I get at the current flow's context? It's saying
AttributeError: 'Flow' object has no attribute 'context'
i guess I'm not sure how to write the function that hangs on the back of this workflow, that looks up mapped states and loops through them
I tried this:
import random
from typing import Set, Optional
from prefect.engine.state import State
import prefect
from prefect.engine.state import State, Success, Failed

from prefect.triggers import all_successful, all_failed, any_failed, always_run, any_successful
from prefect import task, Flow

logger = prefect.context.get("logger")

@task(name="Task A")
def task_a(val):
      if random.random() > 0.5:
        raise ValueError(f"Non-deterministic error has occured.{val}")

@task(name="Task B", trigger=any_successful)
def cleanup_workdir(val):
    <http://logger.info|logger.info>(f"someghing worked, {val}")

def custom_terminal_state_handler(
    flow: Flow,
    state: State,
    reference_task_states: Set[State],
) -> Optional[State]:
    success = False
    # iterate through reference task states looking for failures
    for task_state in reference_task_states:
        if not task_state.is_failed():
            success = True
    # update the terminal state of the Flow and return
    if not success:
        state.message = "Some important tasks have failed"
        state = Success(message="Successful")
    else:
        state = Failed(message="fail")
        # s = prefect.engine.state.Success()
    return state



l = [1,2,3,4]
with Flow("Trigger example", terminal_state_handler=custom_terminal_state_handler) as flow:
    ta_s = task_a.map(val=l)
    cleanups = cleanup_workdir.map(val=l, upstream_tasks=[ta_s])

# flow.set_reference_tasks([ps])

flow.run()
and it almost worked:
2022-09-26 21:29:16+0100] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
Traceback (most recent call last):
  File "map_fail_trigger.py", line 49, in <module>
    flow.run()
  File "/home/macmenaminpe/code/venv_test_d/image_portal_workflows/qa/lib/python3.8/site-packages/prefect/core/flow.py", line 1302, in run
    if task not in state.result:
TypeError: argument of type 'NoneType' is not iterable
m

Mason Menges

09/26/2022, 10:49 PM
Hmm, I guess maybe I'm not understanding what you mean in regards to the triggers? any_successful is already looking for the states of upstream tasks, including mapped tasks, so there shouldn't be a reason to map through the states of previous tasks to determine if one of them failed, the any_successful trigger is already doing that, ah that's because the state your return doesn't have any tasks associated with it, Ultimately the reference tasks are how the flow run determines the final state so you could inherit it from the current state but that would still result in a failure/Success based on the mapped tasks I think. Admittedly I'm not as familiar with using the custom_state_handler so I'd probably have to do some digging to see if that would actually work in this context or not, I though it might but I could definitely have been wrong about that, I still think triggers fit your use case a bit better here though.