I'm trying to write a state_handler which has acce...
# ask-community
p
I'm trying to write a state_handler which has access to results of a task, is there a way to do this via
context
maybe? I saw a reply from @Kevin Kho stating:
Copy code
@task
def abc():
    prefect.context.my_var = here
this is still the way to do this?
k
Yes that’s right. Unless you want to use the KV store
👍 1
Sorry I was wrong. I re-read this and saw you said result. The above is for a variable not returned in the result. You might be able to use
new_state.result
Copy code
from prefect import Flow, task, Parameter
import prefect

def mystatehandler(task, old_state, new_state):
    if new_state.is_finished():
        <http://prefect.context.logger.info|prefect.context.logger.info>("The result was " + str(new_state.result))
    return

@task(state_handlers=[mystatehandler])
def mytask(x):
    if x == 2:
        x + "test"
    else:
        return x + 1

with Flow("...") as flow:
    mytask.map([1,2,3,4,5])

flow.run()
p
I should have been clearer with the question. I'm trying to do the following: • create something that will always run upon completion of workflow (eg a state_handler) • have that code be able to access a Parameter that's provided to the workflow, but it not used elsewhere in the flow • have that code be able to access results that were created within that flow. It's possible that I'm going about this the wrong way, but I was trying to do something more like:
Copy code
from prefect import Flow, task, Parameter
import prefect

def mystatehandler(task, old_state, new_state):
    if new_state.is_finished():
        param = context.get("Parameters", {}).get("param")
        <http://prefect.context.logger.info|prefect.context.logger.info>("this is the param" + str(param))
        <http://prefect.context.logger.info|prefect.context.logger.info>("The result was" + r) 
    return

@task(state_handlers=[mystatehandler])
def mytask(x):
    if x == 2:
        x + "test"
    else:
        return x + 1

with Flow("f_name", state_handlers=[mystatehandler]) as flow:
    param = Parameter("param")
    r = mytask.map([1,2,3,4,5])

flow.run(param="this_is_incoming")
one issue is the flow ignores
param
because as far as it's concerned it's never used. the other issue is I want the thing that gets called upon completion to have access to a Task result.
k
If your param is never used, just do:
Copy code
with Flow("f_name", state_handlers=[mystatehandler]) as flow:
    param = Parameter("param")()
    r = mytask.map([1,2,3,4,5])
and the second
()
will force usage because it’s a task. Yes to access parameters in the state handler you can use
prefect.context.parameters
The result is a bit harder. You can access the task level result with a task state handler, but a flow state handler won’t be able to get the result easily because we don’t hold all of them during execution. The easiest way would be do use the
get_task_run_result
task inside the state handler.
Copy code
def mystatehandler(flow, old_state, new_state):
    res = get_task_run_result.run(prefect.context.flow_run_id, some_task_slug)
    return
p
I guess I'm not following that second example - how do I get the task_slug?
Copy code
from prefect import Flow, task, Parameter, context
from prefect.tasks.prefect import get_task_run_result
import prefect

def mystatehandler(task, old_state, new_state):
    if new_state.is_finished():
        param = context.parameters.get("param")
        <http://prefect.context.logger.info|prefect.context.logger.info>("this is the param " + str(param))
        # prefect.tasks.prefect.flow_run.get_task_run_result
        res = get_task_run_result.run(prefect.context.flow_run_id, r)
        <http://prefect.context.logger.info|prefect.context.logger.info>("The result was" + res) 
    return

@task()
def mytask(x):
    return x + 1

with Flow("f_name", state_handlers=[mystatehandler]) as flow:
    param = Parameter("param")()
    r = mytask(2)

flow.run(param="this_is_incoming")
k
If you are doing a task level state handler like your example, you can get it with
new_state.result
. If you are using the state handler on the Flow, that is when it is harder to get a task result. You would need to know the task slug ahead of time. When you register a flow, Prefect creates the slug of the task. For example, a task with a name
do_nothing
can have a slug
do_nothing-1
. There is mostly a pattern to the slugifying, but I think the easiest way to do it would be to query the API and see what it was registered as:
Copy code
query {
  flow(where: {name: {_eq: "map_testing_fixed"}}){
			tasks {
			  id
        name
        slug
			}
  }
}
This will only work for registered flows, not flow.run()
p
this is seeming to be quite difficult, possibly the state_handler is not what I should be doing here. All I'd like to do is have a task run at the end of every flow, which can have access to whether the flow was successful or not, and have access to some results generated within the run. I can mark a task as
always_run
I believe, is there a convenient way to find out if a flow run has succeeded from within a task?
k
By definition, if a task is still running, the state of the Flow is not terminal yet (still Running, not Success or Failed). So you definitely need a state handler to perform an action based on the Flow state. So if you use a Flow level state hander,
Copy code
def mystatehandler(flow, old_state, new_state):
    if new_state.is_failed():
         ...
    return
you can determine whether or not the state is successful or failed BUT you don’t have access to the results generated inside the run because they aren’t held as they can often be too big. So you would need to use the
get_task_run_result
for that or query the API:
Copy code
def mystatehandler(flow, old_state, new_state):
    if new_state.is_failed():
         res = get_task_run_result(...)
    return
But I think it might solve your issue if you have both a task state handler and a flow state handler. The task state handler has access to the result of that immediate task.
Copy code
def mytaskstatehandler(task, old_state, new_state):
    if new_state.is_failed():
         res = new_state.result
    return
and the Flow-level one has access to the Flow end state
Copy code
def myflowstatehandler(Flow, old_state, new_state):
    if new_state.is_failed():
         ...do...something
    return
If you make both of these fire alerts, then I think you will get the information you want
flow.run()
behaves differently from
flow.register()
in this specific instance because
flow.run()
holds the task states but
flow.register()
and agent triggered runs do not
p
yeah, I'm just not following. I think I'll hack together something that will give me a general idea of if the important bits of the flow worked or not and make it a regular task set to always run.
k
Ok let me change the suggestion. 1. Use a task to report the task results 2. But still use a state handler to report the Flow state
p
so how would the state handler get at the results? It just wouldn't?
there's no way at all I can pass a data structure to a state_handler?
k
Are you with me when I say task-level state handler and flow-level state handler? I think we need to clear that up first
p
yes, no problem with the concept of there being state_handlers for flows vrs tasks.
k
So task-level state handlers have the result of their immediate tasks, but flow-level state handlers have no access to anything. Context manipulation is also highly unlikely to work because it’s really mutable.
So yes for flow-level there is not really a way and you need to load the result (by using the suggested
get_task_run_result
inside the Flow-level state handler)
p
ok - I think I can just change the logic of the system to avoid this scenario. Thanks Kevin!
k
Sounds good!