I am trying ``` if new_state.is_successful(): ...
# ask-community
a
I am trying
Copy code
if new_state.is_successful():
        flow_run_id = prefect.context.flow_run_id
        flow_run_view = FlowRunView.from_flow_run_id(flow_run_id)
        b_task_run = flow_run_view.get_task_run("b-1")
a
@Arnon Kimhi There is an easier way: you can just use new_state.result - here is an example flow:
Copy code
import prefect
from prefect import task, Flow
from prefect.tasks.notifications import SlackTask
from typing import cast


def post_to_slack_on_failure(task, old_state, new_state):
    if new_state.is_failed():
        if isinstance(new_state.result, Exception):
            value = "```{}```".format(repr(new_state.result))
        else:
            value = cast(str, new_state.message)
        msg = (
            f"The task `{prefect.context.task_name}` failed "
            f"in a flow run {prefect.context.flow_run_id} "
            f"with an exception {value}"
        )
        SlackTask(message=msg).run()
    return new_state


@task(state_handlers=[post_to_slack_on_failure])
def divide_numbers(a, b):
    return 1 / (b - a)


with Flow(name="state-inspection-handler") as flow:
    result = divide_numbers(1, 1)


if __name__ == "__main__":
    flow.run()
a
This is flow handler not task handler
a
You can attach state handlers both on a task or flow level. Here I was using it on a task level which you can see here:
Copy code
def post_to_slack_on_failure(task, old_state, new_state):
and here:
Copy code
@task(state_handlers=[post_to_slack_on_failure])
a
And what would be the
new_state.result
in the case of flow handler?
I am looking for a specific task results
in a flow handler
is that possible?
a
no, this wouldn’t work. You would need a task-level state handler to use task results. Flows have no return values and thus no result. To retrieve results from other tasks in a state handler of one task, you would need to go through API which would work, but would be a bit more cumbersome to set up since you would need to retrieve the flow run id and the task slug.
Copy code
@task
def get_task_run_result(
    flow_run_id: str, task_slug: str, map_index: int = -1, poll_time: int = 5
)
Can you describe the problem you try to solve? Perhaps passing data from one task to another task and performing some checks this way would be easier to set up than a state handler?
Copy code
from prefect import task, Flow


@task
def return_some_data():
    return "some result"

@task
def check_result_from_another_task(x):
    if x == "some result":
        # do sth
    else:
        # do sth else

with Flow("checking-results") as flow:
    data = return_some_data()
    check_result_from_another_task(data)
a
Well our use case is too specific to explain.. In general I am playing around with the tool and exploring what practices and patterns would fit our needs… I wanted to keep the flow “clean” from “side effects” such as updating other services with the flow state and results and possibly reusing the flow but with different handlers…
a
I see. I think the pattern above would be the cleanest to act on results from another task since passing data between tasks in memory would be more efficient than retrieving it separately through the Result backend and the API. But you can determine best what you consider clean or not. When it comes to reusing code, you can certainly package modules that you want to reuse across flows, this blog post discusses the topic: https://medium.com/the-prefect-blog/the-simple-guide-to-productionizing-data-workflows-with-docker-31a5aae67c0a