Is there a way to have a task take several tasks a...
# ask-community
j
Is there a way to have a task take several tasks as argument (in a list), have it run even if one of the argument fails, and have the list passed as parameter contain the exception(s) + the results ? I'm asking because I'd like to have a Slack report based on the results of several tasks, and either I'm adding an handler for each task but then I have three slack messages, or I add it at the Flow level but then I don't have (or at least I don't know how) access to the state and results of each task.
k
Hey @Jocelyn Boullier! I have the perfect example for you. Prefect has
terminal_state_handlers
that can be attached to the Flow. This will be called when the flow ends and you can get the failed tasks from it.
Example:
Copy code
from prefect import task, Flow
from prefect.engine.state import State, TriggerFailed
from typing import Set, Optional
def custom_terminal_state_handler(
    flow: Flow,
    state: State,
    reference_task_states: Set[State],
) -> Optional[State]:
    if state.is_failed():
        for task, task_state in state.result.items():
            if not task_state.is_failed():
                continue
            if isinstance(task_state, TriggerFailed):
                print(f"Task {task.name} failed because an upstream failed")
                continue
            print(f"Task {task.name} failed with exception {task_state.result!r}")
    return state
@task
def abc(x):
    return x
@task
def bcd(x):
    raise ValueError("Foo!")
@task
def cde(x):
    return x
with Flow(
    "terminal_handler", terminal_state_handler=custom_terminal_state_handler
) as flow:
    a = abc(1)
    b = bcd(a)
    c = cde(b)
flow.run()
Then you can just use the Slack message in there
j
Amazing, exactly what I needed ! Thanks you very much 🙌
Hey, I'm hitting a weird issue. When running locally,
state.result.items()
is properly populated. But when running on prod, in a k8s job,
state.result
is an empty dict.
reference_task_states
isn't, but I don't know how to correlate the result with the actual task (like with
state.result.items()
.
The difference between local and prod being that the local one is using the default runner, while the prod one is using
KubernetesRun
k
Oh sorry about that
See this thread. Different stuff gets stored for both local and cloud.
j
Yes I figured that out by tracing the bug myself https://github.com/PrefectHQ/prefect/issues/4570
So I would need to query the Prefect API to get that, got it !