p

    Philip MacMenamin

    8 months ago
    I'm trying to write a state_handler which has access to results of a task, is there a way to do this via
    context
    maybe? I saw a reply from @Kevin Kho stating:
    @task
    def abc():
        prefect.context.my_var = here
    this is still the way to do this?
    Kevin Kho

    Kevin Kho

    8 months ago
    Yes that’s right. Unless you want to use the KV store
    Sorry I was wrong. I re-read this and saw you said result. The above is for a variable not returned in the result. You might be able to use
    new_state.result
    from prefect import Flow, task, Parameter
    import prefect
    
    def mystatehandler(task, old_state, new_state):
        if new_state.is_finished():
            <http://prefect.context.logger.info|prefect.context.logger.info>("The result was " + str(new_state.result))
        return
    
    @task(state_handlers=[mystatehandler])
    def mytask(x):
        if x == 2:
            x + "test"
        else:
            return x + 1
    
    with Flow("...") as flow:
        mytask.map([1,2,3,4,5])
    
    flow.run()
    p

    Philip MacMenamin

    8 months ago
    I should have been clearer with the question. I'm trying to do the following: • create something that will always run upon completion of workflow (eg a state_handler) • have that code be able to access a Parameter that's provided to the workflow, but it not used elsewhere in the flow • have that code be able to access results that were created within that flow. It's possible that I'm going about this the wrong way, but I was trying to do something more like:
    from prefect import Flow, task, Parameter
    import prefect
    
    def mystatehandler(task, old_state, new_state):
        if new_state.is_finished():
            param = context.get("Parameters", {}).get("param")
            <http://prefect.context.logger.info|prefect.context.logger.info>("this is the param" + str(param))
            <http://prefect.context.logger.info|prefect.context.logger.info>("The result was" + r) 
        return
    
    @task(state_handlers=[mystatehandler])
    def mytask(x):
        if x == 2:
            x + "test"
        else:
            return x + 1
    
    with Flow("f_name", state_handlers=[mystatehandler]) as flow:
        param = Parameter("param")
        r = mytask.map([1,2,3,4,5])
    
    flow.run(param="this_is_incoming")
    one issue is the flow ignores
    param
    because as far as it's concerned it's never used. the other issue is I want the thing that gets called upon completion to have access to a Task result.
    Kevin Kho

    Kevin Kho

    8 months ago
    If your param is never used, just do:
    with Flow("f_name", state_handlers=[mystatehandler]) as flow:
        param = Parameter("param")()
        r = mytask.map([1,2,3,4,5])
    and the second
    ()
    will force usage because it’s a task. Yes to access parameters in the state handler you can use
    prefect.context.parameters
    The result is a bit harder. You can access the task level result with a task state handler, but a flow state handler won’t be able to get the result easily because we don’t hold all of them during execution. The easiest way would be do use the
    get_task_run_result
    task inside the state handler.
    def mystatehandler(flow, old_state, new_state):
        res = get_task_run_result.run(prefect.context.flow_run_id, some_task_slug)
        return
    p

    Philip MacMenamin

    8 months ago
    I guess I'm not following that second example - how do I get the task_slug?
    from prefect import Flow, task, Parameter, context
    from prefect.tasks.prefect import get_task_run_result
    import prefect
    
    def mystatehandler(task, old_state, new_state):
        if new_state.is_finished():
            param = context.parameters.get("param")
            <http://prefect.context.logger.info|prefect.context.logger.info>("this is the param " + str(param))
            # prefect.tasks.prefect.flow_run.get_task_run_result
            res = get_task_run_result.run(prefect.context.flow_run_id, r)
            <http://prefect.context.logger.info|prefect.context.logger.info>("The result was" + res) 
        return
    
    @task()
    def mytask(x):
        return x + 1
    
    with Flow("f_name", state_handlers=[mystatehandler]) as flow:
        param = Parameter("param")()
        r = mytask(2)
    
    flow.run(param="this_is_incoming")
    Kevin Kho

    Kevin Kho

    8 months ago
    If you are doing a task level state handler like your example, you can get it with
    new_state.result
    . If you are using the state handler on the Flow, that is when it is harder to get a task result. You would need to know the task slug ahead of time. When you register a flow, Prefect creates the slug of the task. For example, a task with a name
    do_nothing
    can have a slug
    do_nothing-1
    . There is mostly a pattern to the slugifying, but I think the easiest way to do it would be to query the API and see what it was registered as:
    query {
      flow(where: {name: {_eq: "map_testing_fixed"}}){
    			tasks {
    			  id
            name
            slug
    			}
      }
    }
    This will only work for registered flows, not flow.run()
    p

    Philip MacMenamin

    8 months ago
    this is seeming to be quite difficult, possibly the state_handler is not what I should be doing here. All I'd like to do is have a task run at the end of every flow, which can have access to whether the flow was successful or not, and have access to some results generated within the run. I can mark a task as
    always_run
    I believe, is there a convenient way to find out if a flow run has succeeded from within a task?
    Kevin Kho

    Kevin Kho

    8 months ago
    By definition, if a task is still running, the state of the Flow is not terminal yet (still Running, not Success or Failed). So you definitely need a state handler to perform an action based on the Flow state. So if you use a Flow level state hander,
    def mystatehandler(flow, old_state, new_state):
        if new_state.is_failed():
             ...
        return
    you can determine whether or not the state is successful or failed BUT you don’t have access to the results generated inside the run because they aren’t held as they can often be too big. So you would need to use the
    get_task_run_result
    for that or query the API:
    def mystatehandler(flow, old_state, new_state):
        if new_state.is_failed():
             res = get_task_run_result(...)
        return
    But I think it might solve your issue if you have both a task state handler and a flow state handler. The task state handler has access to the result of that immediate task.
    def mytaskstatehandler(task, old_state, new_state):
        if new_state.is_failed():
             res = new_state.result
        return
    and the Flow-level one has access to the Flow end state
    def myflowstatehandler(Flow, old_state, new_state):
        if new_state.is_failed():
             ...do...something
        return
    If you make both of these fire alerts, then I think you will get the information you want
    flow.run()
    behaves differently from
    flow.register()
    in this specific instance because
    flow.run()
    holds the task states but
    flow.register()
    and agent triggered runs do not
    p

    Philip MacMenamin

    8 months ago
    yeah, I'm just not following. I think I'll hack together something that will give me a general idea of if the important bits of the flow worked or not and make it a regular task set to always run.
    Kevin Kho

    Kevin Kho

    8 months ago
    Ok let me change the suggestion.1. Use a task to report the task results 2. But still use a state handler to report the Flow state
    p

    Philip MacMenamin

    8 months ago
    so how would the state handler get at the results? It just wouldn't?
    there's no way at all I can pass a data structure to a state_handler?
    Kevin Kho

    Kevin Kho

    8 months ago
    Are you with me when I say task-level state handler and flow-level state handler? I think we need to clear that up first
    p

    Philip MacMenamin

    8 months ago
    yes, no problem with the concept of there being state_handlers for flows vrs tasks.
    Kevin Kho

    Kevin Kho

    8 months ago
    So task-level state handlers have the result of their immediate tasks, but flow-level state handlers have no access to anything. Context manipulation is also highly unlikely to work because it’s really mutable.
    So yes for flow-level there is not really a way and you need to load the result (by using the suggested
    get_task_run_result
    inside the Flow-level state handler)
    p

    Philip MacMenamin

    8 months ago
    ok - I think I can just change the logic of the system to avoid this scenario. Thanks Kevin!
    Kevin Kho

    Kevin Kho

    8 months ago
    Sounds good!