Hi everybody! I've stuck with a problem regarding...
# ask-community
а
Hi everybody! I've stuck with a problem regarding flow state result being different when I run it locally and via Prefect Cloud In my project I need to get access to finished tasks states of a flow. So I've implemented a state handler for a flow where I am trying to catch this result. And when I run it locally (via flow.run() ) it gives me the result I've expected. But when I run it via Prefect Cloud the result is empty (the following pictures are: my state_handler, output for the flow.run(), output in the Prefect Cloud) I am not sure if it's a bug but I do want to be able to read the state result in the Cloud Thank you in advance
a
Hi! You’re right, using this logic to access task run states from a flow-level state handler returns an empty dictionary. Since there is a very similar open issue, I added your use case as a comment here. As a workaround, you can solve it by using a task-level state handler:
Copy code
import prefect
from prefect import task, Flow
from prefect.engine.state import Finished


def log_state(obj, old_state, new_state):
    if isinstance(new_state, Finished):
        logger = prefect.context.get("logger")
        task_name = prefect.context.get("task_name")
        <http://logger.info|logger.info>("Logging task run state for the task %s: %s", task_name, new_state)


@task(state_handlers=[log_state])
def say_hi():
    print("Hi")


@task(state_handlers=[log_state])
def say_hello():
    print("Hello")


with Flow("print_states_task_level") as flow:
    t1 = say_hi()
    t2 = say_hello()


if __name__ == "__main__":
    state = flow.run()
This will log the task name and its final state as follows:
Copy code
└── 15:08:14 | INFO    | Logging task run state for the task say_hi: <Success: "Task run succeeded.">
а
Ok. I'd gladly use task state handlers, but I can't see a way how to get access to finished mapped tasks (children) Maybe there is a way to do so?
a
it should work the same way if you attach this state handler to the mapped task
а
Actually no. The parent task enters the final state 'Mapped' before the first children has started to run. And I need to have the info only about the failed children but before that they must have been finished.
a
Could you build a small example showing the issue and add it either here or within the GitHub issue above? I will then reproduce and see how we can solve it
а
Here you can see what I've described earlier
I need to apply parent_state.map_states after the last children finishes its work. We can't apply this in state_handler (because the 'Mapped' state achieves too early) But you can't do that from another task's state_handler either. The only possible solution is the flow_state_handler which also can't work((((