Hi all, I have a quick question about state handle...
# ask-community
a
Hi all, I have a quick question about state handlers. I’ve created one for the use case if the flow ends up in a failed state, I want to grab all the error details from underlying tasks and send an email. What I’m doing when testing locally is:
Copy code
for t in flow.tasks:
        tr = state.result.get(t, None)
        if not tr: continue
        <http://flow.logger.info|flow.logger.info>(f"Task name '{t.name}' -> State '{tr.message}' -> Is failed: {tr.is_failed()}")
        if tr.is_failed():
            err_params['tasks'].append(OrderedDict({
                'taskName': t.name,
                'errorMessage': tr.message,
                'errorParams': tr.result if isinstance(tr.result, dict) else str(tr.result)
            }))
Basically just going through all the tasks and checking if they are is_failed is true, if so I am grabbing some info. This all works as intended locally, but when I package my flows in a docker and run with docker agent, it turns out state.result is an empty dictionary. While previously locally, I found each tasks result there. I’ve played with GCSResult and just leaving it to default but in both cases while running in docker the state.result={}. Any ideas what I might be doing wrong?
k
Hey @Anze Kravanja, you are not doing anything wrong. The behavior of this locally and with a backend is different because we don’t store this in the backend as it is too much to hold for a Flow with thousands of tasks. In order to get this working there are two options (and potentially a third): 1. Query the GraphQL API for this information in the state handler 2. Using the new classes in 0.15.0 that query this under the hood. Link here 3. If you are only concerned with the results of one mapped tasks, you can keep the failures and send them with a Slack task.
Zach has a code snippet here where he did this (following number 1).
And then there is some latency involved for the results to be updated in the Prefect database so you may need a
time.sleep(5)
to allow it to sync.
a
Nice, thank you! I will read through the options and see what fits my use case best.
Looks like number 2 would work. @Kevin Kho just confirming, there is no way to potentially still make the docker agent hold on to all those results? My results are pretty small, and would be cool if local vs docker agent behaviour would be the same.
k
Unfortunately not at the moment.
b
Hi @Kevin Kho could you say a little bit more about option (3) above? I'm trying to figure out how to post error details to slack and I'm not sure where to actually find that info.
Copy code
def on_failure_post_to_slack(flow, old_state, new_state):
    if new_state.is_failed():
        msg = f"Flow {flow.name} finished in state {new_state}"
        <http://requests.post|requests.post>(SLACK_WEBHOOK_URL, json={"text": msg})
(This happens to be a one task flow, so I'm doing the state handler at the flow level. Maybe that has something to do with it?)
k
I have this code snippet that illustrates that idea. I use a Slack task instead of a State Handler. Check this
For you though @Billy McMonagle, I think you can do something like this in the state handler. See the
task.result
.
Copy code
# Import Library
import prefect
from prefect import task, Flow, Parameter
from typing import Dict
from prefect.engine.results import GCSResult
class AnotherError(Exception):
    def __init__(self, message, inputs: Dict): 
        self.message = message
        self.inputs = inputs
        super().__init__(self.message)
def myStateHandler(task, old_state, new_state):
    logger = prefect.context.get("logger")
    if new_state.is_finished():
        <http://logger.info|logger.info>("Task Result")
        <http://logger.info|logger.info>(task.result)
        <http://logger.info|logger.info>(task.result.location)
def onFailureTask(task, state):
    logger = prefect.context.get("logger")
    # Task Variables
    <http://logger.info|logger.info>("Task inputs")
    <http://logger.info|logger.info>(state)
    <http://logger.info|logger.info>(state.result.inputs)
    # Parameters
    <http://logger.info|logger.info>("Parameters")
    <http://logger.info|logger.info>(prefect.context.get("parameters", {}))
    <http://logger.info|logger.info>("Task Result")
    <http://logger.info|logger.info>(task.result)
@task(state_handlers=[myStateHandler], result=GCSResult(bucket="prefect-kvnkho", location="1.txt"))
def abc(x):
    return x
# Error Task
@task(on_failure=onFailureTask, result=GCSResult(bucket="prefect-kvnkho"))
def myTask(str_param):
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>(str(str_param))
    raise(AnotherError(f"Error Message Is Here", inputs={'str_param': str_param}))
with Flow("flow_must_fail") as flow:
    param1 = Parameter('param1',default="XXX")
    a = abc(param1)
    b = myTask(a)
flow.run()
b
interesting. thank you, this version is more relevant I think.
k
If you do it on a mapped task though, you get one Slack message per failure, so that user in the first code snippet wanted 1 error message for all failures collected. That’s why there’s a step to collect all the errors.
b
That makes sense. Glad to have those building blocks there for flows where it is relevant, unlike my very simple flow.
I tried attaching the state handler to the task instead of the flow, and this is what I get posted to Slack... I think this is what I was looking for (the error is correct)
Copy code
# message from flow level state handler
Flow Query1 finished in state <Failed: "Some reference tasks failed.">
# message from task level state handler
Flow execute finished in state <Failed: "Unable to locate query file /usr/src/queries/sql/query1/query.sql">
k
Oh that’s good. You good now?
b
I am! And you've given me a few other things to chew on. I didn't think I needed to care about
Result
objects yet, but in fact they seem useful
thank you
k
Of course! 👍
b
Sorry @Kevin Kho can I ask one more question... is it possible to access the flow object from the task itself?
k
In the state handler right? Unfortunately not, unless you use the GraphQL API with the Prefect Client in the State Handler to query that info. You do have access to the
context
though
b
Got it! Trying to avoid graphql unless absolutely necessary. Context might help though. Thanks!
a
@Kevin Kho one more follow up here if you don’t mind. I’m playing with result persistence more specifically with GCSResult. But what I’m noticing is that only SUCCESS states get persistent. Is there any way to change that? What I’m thinking is for my original use case, if the flow’s state (or task’s) persisted (even if there’s an error) I could just read those states from GCS instead of pinging GraphQL API.
k
Yeah of course @Anze Kravanja, there is no way to change that. We only persist
SUCCESS
because that is also the backbone of flow re-runs. If you re-try from failure, it pulls the result of the upstream task that was persisted to avoided re-running already successful task. I don’t think there is meaning if you load in an error. I think this separates data concerns from orchestration concerns. Now, for you, I think you can manually do this but it’s a bit tricky. You can use the
Result
interface like:
Copy code
@task
def abc(x):
    res = Result()
    res.write("x", location=...)
    return res.location

@task
def abc2(location):
    res = Result().read(location)
This gives you more control over what is persisted as you can force it to persist errors. There is one caveat though that makes this tricky.
Results
and paired with
Serializers
like the
JSONSerializer
and
PandasSerializer
. Serializing an exception as a string is probably fine is all your other data is
JSONSerialzeable
but you can’t mix and match depending on the type of data you’re getting unless you just use a serializer that does nothing (which you can make in 5 lines of code no problem)
👍 1