https://prefect.io logo
Title
j

Jeff Williams

05/21/2021, 3:11 PM
Hello all. I have an odd situation that I can't seem to figure out. The objective is to capture information about flow and task runs and save them off to some "external" storage. The environment is ultimately going to be on a Google Cloud instance, but I am not there yet. I have worked through things locally, meaning I have my test script written to run locally. I am taking the next step, which is to register the flow with a local agent (local infrastructure - no problems there) and then run it on a local agent. All of that seems to work fine, for the most part. I am using a custom terminal state handler to capture the final states of the tasks and flow that has just finished executing. The specific issue I am facing right now is that the state.result dictionary is populated when run locally but it is not when it is run using a local agent. Specifically, if I do
len(state.result.keys())
I get a non-zero value locally but get zero when run on the agent. Any ideas as to why?
k

Kevin Kho

05/21/2021, 3:13 PM
Hi @Jeff Williams, I have an example of a terminal state handler here that I can share with you and maybe you can compare?
from prefect import task, Flow
from prefect.engine.state import State, TriggerFailed
from typing import Set, Optional
def custom_terminal_state_handler(
    flow: Flow,
    state: State,
    reference_task_states: Set[State],
) -> Optional[State]:
    if state.is_failed():
        for task, task_state in state.result.items():
            if not task_state.is_failed():
                continue
            if isinstance(task_state, TriggerFailed):
                print(f"Task {task.name} failed because an upstream failed")
                continue
            print(f"Task {task.name} failed with exception {task_state.result!r}")
    return state
@task
def abc(x):
    return x
@task
def bcd(x):
    raise ValueError("Foo!")
@task
def cde(x):
    return x
with Flow(
    "terminal_handler", terminal_state_handler=custom_terminal_state_handler
) as flow:
    a = abc(1)
    b = bcd(a)
    c = cde(b)
flow.run()
Or well, I guess try this with local agent?
j

Jeff Williams

05/21/2021, 3:14 PM
Running the terminal state handler is not the issue. That is fine. The question is how the Agent populates values differently versus running locally.
k

Kevin Kho

05/21/2021, 3:16 PM
The things stored in flow.run() are not the same as what it stored in the backend. I don’t know the specifics. I will find someone who can explain this better than me.
👍 1
j

Jeff Williams

05/21/2021, 3:25 PM
Point of clarification to my original post. state here is the same as the state in the terminal handler parameter list. That may be obvious, but wanted to make sure it was clear.
z

Zach Angell

05/21/2021, 3:36 PM
Hi @Jeff Williams, this is an understandable point of confusion. Getting into the weeds of Prefect a bit - When running locally, all state information is stored in context handled by Core. When you call
flow.run()
, the
Flow
object specifies all of its
task
objects should be included as
return_tasks
in the `Flow`'s final
state
. When running against Server/Cloud,
return_tasks
defaults to
None
, which is why your
state
object has no results. The reasoning here is that all task results for a flow run can be a very large piece of data, especially with mapped tasks. We don't have an efficient way of interacting with this from Core just yet. However, we're working on related features that will make this feasible. Is there something specific you're trying to accomplish with the terminal state handler? I'm happy to help find a workaround
j

Jeff Williams

05/21/2021, 3:46 PM
@Zach Angell - Thanks for that helpful reply. We are wanting to write some information about the flow / tasks to a persistent storage (Firestore, right now) so that it is easier for some other groups in our organization to understand the flow state. The other groups may not have access to the Prefect UI nor have the technical expertise to be able to query the GraphQL endpoint. Does that help explain things a bit?
@Zach Angell - To add some more information, I would also like to be able to get some fundamental statistics on each task's result object such as type of return, length if a list, number of keys / values if it is a dict, that sort of thing.
z

Zach Angell

05/21/2021, 4:05 PM
Yep that's helpful context, thanks! Some of the info you mentioned is fairly easy to get in the terminal state handler. For example, whether or not
state
is successful or failed. For more detailed info like task results, I'd recommend querying the Prefect API for the time being. This could be done in the terminal state handler if needed in a timely manner, or as a batch job after the fact. Some of the work in this PR (which will make what you want to do much easier) might provide inspiration https://github.com/PrefectHQ/prefect/pull/4434/files. In particular, I'd take a look through
src/backend/flow_run.py
and
src/backend/task_run.py
j

Jeff Williams

05/21/2021, 4:28 PM
Thanks. I will take a look....