Philip MacMenamin
01/05/2022, 12:16 AMcontext
maybe? I saw a reply from @Kevin Kho stating:
@task
def abc():
prefect.context.my_var = here
this is still the way to do this?Kevin Kho
Kevin Kho
new_state.result
Kevin Kho
from prefect import Flow, task, Parameter
import prefect
def mystatehandler(task, old_state, new_state):
if new_state.is_finished():
<http://prefect.context.logger.info|prefect.context.logger.info>("The result was " + str(new_state.result))
return
@task(state_handlers=[mystatehandler])
def mytask(x):
if x == 2:
x + "test"
else:
return x + 1
with Flow("...") as flow:
mytask.map([1,2,3,4,5])
flow.run()
Philip MacMenamin
01/05/2022, 4:15 PMfrom prefect import Flow, task, Parameter
import prefect
def mystatehandler(task, old_state, new_state):
if new_state.is_finished():
param = context.get("Parameters", {}).get("param")
<http://prefect.context.logger.info|prefect.context.logger.info>("this is the param" + str(param))
<http://prefect.context.logger.info|prefect.context.logger.info>("The result was" + r)
return
@task(state_handlers=[mystatehandler])
def mytask(x):
if x == 2:
x + "test"
else:
return x + 1
with Flow("f_name", state_handlers=[mystatehandler]) as flow:
param = Parameter("param")
r = mytask.map([1,2,3,4,5])
flow.run(param="this_is_incoming")
Philip MacMenamin
01/05/2022, 4:16 PMparam
because as far as it's concerned it's never used.
the other issue is I want the thing that gets called upon completion to have access to a Task result.Kevin Kho
with Flow("f_name", state_handlers=[mystatehandler]) as flow:
param = Parameter("param")()
r = mytask.map([1,2,3,4,5])
and the second ()
will force usage because it’s a task.
Yes to access parameters in the state handler you can use prefect.context.parameters
Kevin Kho
get_task_run_result
task inside the state handler.
def mystatehandler(flow, old_state, new_state):
res = get_task_run_result.run(prefect.context.flow_run_id, some_task_slug)
return
Philip MacMenamin
01/05/2022, 5:45 PMfrom prefect import Flow, task, Parameter, context
from prefect.tasks.prefect import get_task_run_result
import prefect
def mystatehandler(task, old_state, new_state):
if new_state.is_finished():
param = context.parameters.get("param")
<http://prefect.context.logger.info|prefect.context.logger.info>("this is the param " + str(param))
# prefect.tasks.prefect.flow_run.get_task_run_result
res = get_task_run_result.run(prefect.context.flow_run_id, r)
<http://prefect.context.logger.info|prefect.context.logger.info>("The result was" + res)
return
@task()
def mytask(x):
return x + 1
with Flow("f_name", state_handlers=[mystatehandler]) as flow:
param = Parameter("param")()
r = mytask(2)
flow.run(param="this_is_incoming")
Kevin Kho
new_state.result
. If you are using the state handler on the Flow, that is when it is harder to get a task result. You would need to know the task slug ahead of time.
When you register a flow, Prefect creates the slug of the task. For example, a task with a name do_nothing
can have a slug do_nothing-1
. There is mostly a pattern to the slugifying, but I think the easiest way to do it would be to query the API and see what it was registered as:
query {
flow(where: {name: {_eq: "map_testing_fixed"}}){
tasks {
id
name
slug
}
}
}
Kevin Kho
Philip MacMenamin
01/05/2022, 6:16 PMalways_run
I believe, is there a convenient way to find out if a flow run has succeeded from within a task?Kevin Kho
def mystatehandler(flow, old_state, new_state):
if new_state.is_failed():
...
return
you can determine whether or not the state is successful or failed BUT you don’t have access to the results generated inside the run because they aren’t held as they can often be too big. So you would need to use the get_task_run_result
for that or query the API:
def mystatehandler(flow, old_state, new_state):
if new_state.is_failed():
res = get_task_run_result(...)
return
But I think it might solve your issue if you have both a task state handler and a flow state handler. The task state handler has access to the result of that immediate task.
def mytaskstatehandler(task, old_state, new_state):
if new_state.is_failed():
res = new_state.result
return
and the Flow-level one has access to the Flow end state
def myflowstatehandler(Flow, old_state, new_state):
if new_state.is_failed():
...do...something
return
If you make both of these fire alerts, then I think you will get the information you wantKevin Kho
flow.run()
behaves differently from flow.register()
in this specific instance because flow.run()
holds the task states but flow.register()
and agent triggered runs do notPhilip MacMenamin
01/05/2022, 8:11 PMKevin Kho
Philip MacMenamin
01/05/2022, 8:19 PMPhilip MacMenamin
01/05/2022, 8:20 PMKevin Kho
Philip MacMenamin
01/05/2022, 8:26 PMKevin Kho
Kevin Kho
get_task_run_result
inside the Flow-level state handler)Philip MacMenamin
01/06/2022, 2:01 AMKevin Kho