Anze Kravanja

    Anze Kravanja

    1 year ago
    Hi all, I have a quick question about state handlers. I’ve created one for the use case if the flow ends up in a failed state, I want to grab all the error details from underlying tasks and send an email. What I’m doing when testing locally is:
    for t in flow.tasks:
            tr = state.result.get(t, None)
            if not tr: continue
            <http://flow.logger.info|flow.logger.info>(f"Task name '{t.name}' -> State '{tr.message}' -> Is failed: {tr.is_failed()}")
            if tr.is_failed():
                err_params['tasks'].append(OrderedDict({
                    'taskName': t.name,
                    'errorMessage': tr.message,
                    'errorParams': tr.result if isinstance(tr.result, dict) else str(tr.result)
                }))
    Basically just going through all the tasks and checking if they are is_failed is true, if so I am grabbing some info. This all works as intended locally, but when I package my flows in a docker and run with docker agent, it turns out state.result is an empty dictionary. While previously locally, I found each tasks result there. I’ve played with GCSResult and just leaving it to default but in both cases while running in docker the state.result={}. Any ideas what I might be doing wrong?
    Kevin Kho

    Kevin Kho

    1 year ago
    Hey @Anze Kravanja, you are not doing anything wrong. The behavior of this locally and with a backend is different because we don’t store this in the backend as it is too much to hold for a Flow with thousands of tasks. In order to get this working there are two options (and potentially a third): 1. Query the GraphQL API for this information in the state handler 2. Using the new classes in 0.15.0 that query this under the hood. Link here 3. If you are only concerned with the results of one mapped tasks, you can keep the failures and send them with a Slack task.
    Zach has a code snippet here where he did this (following number 1).
    And then there is some latency involved for the results to be updated in the Prefect database so you may need a
    time.sleep(5)
    to allow it to sync.
    Anze Kravanja

    Anze Kravanja

    1 year ago
    Nice, thank you! I will read through the options and see what fits my use case best.
    Looks like number 2 would work.@Kevin Kho just confirming, there is no way to potentially still make the docker agent hold on to all those results? My results are pretty small, and would be cool if local vs docker agent behaviour would be the same.
    Kevin Kho

    Kevin Kho

    1 year ago
    Unfortunately not at the moment.
    Billy McMonagle

    Billy McMonagle

    1 year ago
    Hi @Kevin Kho could you say a little bit more about option (3) above? I'm trying to figure out how to post error details to slack and I'm not sure where to actually find that info.
    def on_failure_post_to_slack(flow, old_state, new_state):
        if new_state.is_failed():
            msg = f"Flow {flow.name} finished in state {new_state}"
            <http://requests.post|requests.post>(SLACK_WEBHOOK_URL, json={"text": msg})
    (This happens to be a one task flow, so I'm doing the state handler at the flow level. Maybe that has something to do with it?)
    Kevin Kho

    Kevin Kho

    1 year ago
    I have this code snippet that illustrates that idea. I use a Slack task instead of a State Handler. Check this
    For you though @Billy McMonagle, I think you can do something like this in the state handler. See the
    task.result
    .
    # Import Library
    import prefect
    from prefect import task, Flow, Parameter
    from typing import Dict
    from prefect.engine.results import GCSResult
    class AnotherError(Exception):
        def __init__(self, message, inputs: Dict): 
            self.message = message
            self.inputs = inputs
            super().__init__(self.message)
    def myStateHandler(task, old_state, new_state):
        logger = prefect.context.get("logger")
        if new_state.is_finished():
            <http://logger.info|logger.info>("Task Result")
            <http://logger.info|logger.info>(task.result)
            <http://logger.info|logger.info>(task.result.location)
    def onFailureTask(task, state):
        logger = prefect.context.get("logger")
        # Task Variables
        <http://logger.info|logger.info>("Task inputs")
        <http://logger.info|logger.info>(state)
        <http://logger.info|logger.info>(state.result.inputs)
        # Parameters
        <http://logger.info|logger.info>("Parameters")
        <http://logger.info|logger.info>(prefect.context.get("parameters", {}))
        <http://logger.info|logger.info>("Task Result")
        <http://logger.info|logger.info>(task.result)
    @task(state_handlers=[myStateHandler], result=GCSResult(bucket="prefect-kvnkho", location="1.txt"))
    def abc(x):
        return x
    # Error Task
    @task(on_failure=onFailureTask, result=GCSResult(bucket="prefect-kvnkho"))
    def myTask(str_param):
        logger = prefect.context.get("logger")
        <http://logger.info|logger.info>(str(str_param))
        raise(AnotherError(f"Error Message Is Here", inputs={'str_param': str_param}))
    with Flow("flow_must_fail") as flow:
        param1 = Parameter('param1',default="XXX")
        a = abc(param1)
        b = myTask(a)
    flow.run()
    Billy McMonagle

    Billy McMonagle

    1 year ago
    interesting. thank you, this version is more relevant I think.
    Kevin Kho

    Kevin Kho

    1 year ago
    If you do it on a mapped task though, you get one Slack message per failure, so that user in the first code snippet wanted 1 error message for all failures collected. That’s why there’s a step to collect all the errors.
    Billy McMonagle

    Billy McMonagle

    1 year ago
    That makes sense. Glad to have those building blocks there for flows where it is relevant, unlike my very simple flow.
    I tried attaching the state handler to the task instead of the flow, and this is what I get posted to Slack... I think this is what I was looking for (the error is correct)
    # message from flow level state handler
    Flow Query1 finished in state <Failed: "Some reference tasks failed.">
    # message from task level state handler
    Flow execute finished in state <Failed: "Unable to locate query file /usr/src/queries/sql/query1/query.sql">
    Kevin Kho

    Kevin Kho

    1 year ago
    Oh that’s good. You good now?
    Billy McMonagle

    Billy McMonagle

    1 year ago
    I am! And you've given me a few other things to chew on. I didn't think I needed to care about
    Result
    objects yet, but in fact they seem useful
    thank you
    Kevin Kho

    Kevin Kho

    1 year ago
    Of course! 👍
    Billy McMonagle

    Billy McMonagle

    1 year ago
    Sorry @Kevin Kho can I ask one more question... is it possible to access the flow object from the task itself?
    Kevin Kho

    Kevin Kho

    1 year ago
    In the state handler right? Unfortunately not, unless you use the GraphQL API with the Prefect Client in the State Handler to query that info. You do have access to the
    context
    though
    Billy McMonagle

    Billy McMonagle

    1 year ago
    Got it! Trying to avoid graphql unless absolutely necessary. Context might help though. Thanks!
    Anze Kravanja

    Anze Kravanja

    1 year ago
    @Kevin Kho one more follow up here if you don’t mind. I’m playing with result persistence more specifically with GCSResult. But what I’m noticing is that only SUCCESS states get persistent. Is there any way to change that? What I’m thinking is for my original use case, if the flow’s state (or task’s) persisted (even if there’s an error) I could just read those states from GCS instead of pinging GraphQL API.
    Kevin Kho

    Kevin Kho

    1 year ago
    Yeah of course @Anze Kravanja, there is no way to change that. We only persist
    SUCCESS
    because that is also the backbone of flow re-runs. If you re-try from failure, it pulls the result of the upstream task that was persisted to avoided re-running already successful task. I don’t think there is meaning if you load in an error. I think this separates data concerns from orchestration concerns. Now, for you, I think you can manually do this but it’s a bit tricky. You can use the
    Result
    interface like:
    @task
    def abc(x):
        res = Result()
        res.write("x", location=...)
        return res.location
    
    @task
    def abc2(location):
        res = Result().read(location)
    This gives you more control over what is persisted as you can force it to persist errors. There is one caveat though that makes this tricky.
    Results
    and paired with
    Serializers
    like the
    JSONSerializer
    and
    PandasSerializer
    . Serializing an exception as a string is probably fine is all your other data is
    JSONSerialzeable
    but you can’t mix and match depending on the type of data you’re getting unless you just use a serializer that does nothing (which you can make in 5 lines of code no problem)