https://prefect.io logo
Title
s

Serdar Tumgoren

07/01/2021, 12:31 AM
Hey everyone, Been banging my head on the keyboard for a bit on this one, and would be indebted for any guidance. I’m trying to create a flow that: 1. adds metadata to FAILED signals via a mapped task 2. applies a state handler to the flow that can look up metadata from the FAILED task states and alert the user with a final notification that includes detailed metadata from each failed task The strategy is based on the notification docs: https://docs.prefect.io/core/concepts/notifications.html#responding-to-state When I execute a basic flow run locally (without using server or Cloud), the script works as expected and I get a log message listing the failures (I’m testing with a logger, but the goal is to eventually alert via Slack). But when I run the same code against Server or Cloud, the individual task failures no longer seem accessible via the state handler. It seems like
new_state.result
inside the state handler is the problem locale. Locally, it is a dict populated with (<task>, <state>) items; on Server or Cloud,
new_state.result
appears to return an empty dict. I’ve created two basic flow scripts to demonstrate this behavior (one for a local manual run, the other for Server or Cloud) and will attach them below. Each contains a
NOTE
marking the divergent status of
new_state.result
. Would be indebted if someone could advise on what I’m doing wrong!
Running this with
python test_flow.py
issues the expected logging (see toward bottom of script)
This is a version modified for Server/Cloud. It does not issue the expected logging when run via Server or Cloud UI
a

Amanda Wee

07/01/2021, 1:59 AM
Looking at Prefect's code without testing, so I may be suggesting gibberish, I wonder if you could try:
raise signals.FAIL(e, result=(x, y))
If I'm reading the code right, this should set the result of the
Failed
state associated with the
FAIL
signal to
(x, y)
, so in the state handler
new_state.result
should have it.
Nah, I decided to test after all, and nope, it didn't work: the new_state.result was still empty.
s

Serdar Tumgoren

07/01/2021, 2:20 AM
@Amanda Wee Thanks for the suggestion! Alas, this doesn’t seem to fix the issue 😕
Yep, that’s what I’m seeing. I feel like I’m misunderstanding something very fundamental about how state handling and signals work in a Server/Cloud context…
a

Amanda Wee

07/01/2021, 2:21 AM
I believe it should be the same. We could be looking at a bug.
s

Serdar Tumgoren

07/01/2021, 2:23 AM
Oh interesting. Well, let me know if you have any other thoughts. I’ll keep poking around just in case (there are still some things I’m trying to explore through logging and spelunking the source code). Thanks!
a

Amanda Wee

07/01/2021, 2:37 AM
Well, I turned your flow state handler into a task state handler and it worked in prefect server, although I expect that that is rather unwieldy since you have to specify it for each task. On the other hand, you're explicitly raising the signal for each task too, so it isn't that much more onerous.
Okay, I still think the Prefect team should look into this as a possible bug, but here's my workaround with a flow state handler that creates a singleton to track failure metadata, and a task state handler to record the failure metadata:
class FlowFailureData:
    instance = None

    def __init__(self):
        self.failures = []

    def __str__(self):
        output = ""
        for failure in self.failures:
            output += f"{failure[0]}: {failure[1]}"
        return output


def handle_flow_failure_data(flow, old_state, new_state):
    logger = prefect.context.get('logger')
    flow_name = prefect.context.flow_name
    msg = flow_name
    if new_state.is_running():
        FlowFailureData.instance = FlowFailureData()
    elif new_state.is_finished():
        if new_state.is_successful():
            msg += " ran successfully."
            <http://logger.info|logger.info>(msg)
        elif FlowFailureData.instance.failures:
            msg += " had failures:\n"
            msg += str(FlowFailureData.instance)
        <http://logger.info|logger.info>(msg)


def notify_on_finish(task, old_state, new_state):
    if new_state.is_failed():
        FlowFailureData.instance.failures.append((new_state, new_state.result))


@task(state_handlers=[notify_on_finish])
def div(x, y):
    try:
        return x / y
    except Exception as e:
        raise signals.FAIL(e, result=(x, y))


with Flow('Test - flow state handler', state_handlers=[handle_flow_failure_data]) as flow:
    x_vals = [4, 2]
    y_vals = [2, 0]
    div.map(x=x_vals, y=y_vals)

#flow.run()
flow.register(project_name="Test flows")
In this contrived test example's case though, I note that strictly speaking you don't need to record the metadata separately because it is the same as the task inputs which you can inspect, but I imagine that you have more complex scenarios in mind and want to report it immediately to slack.
s

Serdar Tumgoren

07/01/2021, 1:18 PM
@Amanda Wee You rock! The singleton approach does the trick! I should admit that it’s also quite possible I’m over-complicating things here. My end goal is to compile a slack alert that lists basic metadata about any mapped tasks that had a failure. My current real-world use case is very similar to the contrived division test we’re discussing here. I only need to log one minor bit of metadata that is, in fact, passed into the mapped task (it’s a U.S. state postal code). My goal is to not send notifications when a single task fails, but to send a single alert at the end of the flow run that notifies users about which U.S. states (not Prefect task/flow states) failed. It sounds like you’re suggesting I could simply inspect the inputs for all tasks to get this bit of metadata. That’d work fine, although I haven’t been able to figure out quite how to extract that info (I tried
flow.get_tasks
for example, but the
inputs
on the tasks appeared to be Prefect metadata as opposed to the actual values passed in as task arguments. Perhaps I’m missing something there? I should also mention that I’ll eventually be running this in parallel with the DaskExecutor, so not sure if that poses any potential landmines for the singleton approach. I’ll test that out with the LocalDaskExecutor to verify. Meantime, let me know if you think I can use a simpler
inputs
approach. And many many thanks for your help so far!
@Amanda Wee I did some spelunking on Prefect GH issues and it sounds like I’m not the first to run into this 🙂 Seems like this is expected behavior. Is that correct @Kevin Kho? https://github.com/PrefectHQ/prefect/issues/4570
k

Kevin Kho

07/01/2021, 1:40 PM
I didn’t read all of this yet but yes it is right that the result is different for local runs and Cloud/Server runs. This message contains the explanation
s

Serdar Tumgoren

07/01/2021, 1:46 PM
@Kevin Kho That’s super helpful! Thanks for the speedy reply! Is it still the recommended approach to query the GraphQL API for task state info?
k

Kevin Kho

07/01/2021, 1:51 PM
Yes at the moment. Next release will have a simpler interface for it. Check this out for the query. Note that he had to include a 5 second wait because there is a delay of when the tasks get updated in the database. Might be more instant for server. That whole thread talks about sending a Slack message that contains whcih tasks fialed
s

Serdar Tumgoren

07/01/2021, 1:53 PM
@Kevin Kho Excellent! Thanks so much for all the helpful details. I’ll give the GraphQL approach a try. And thanks again to @Amanda Wee for all your help! 🙂
Hey @Kevin Kho sorry to bug you again on this, but I had a follow-up on the GraphQL API approach you suggested. It appears that the API call for
flow_run_by_pk
also does not provide the data I’m attempting to store in the
state_result
field when using Cloud. On closer inspection of Zach’s solution, he doesn’t seem to be trying to access this information. I can, however, access the
state_message
. Is this expected behavior and, if so, is the simplest solution to just parse the data I need from the
state_message
? Here is the GraphQL query I’m using to test (on a failed run that attempted to add task metadata to a Failed state signal):
query {
  flow_run_by_pk(id: "4fff3334-8053-47cc-949c-ed449643a800") {
    flow_id
    name
    state
    task_runs {
      state
      state_result # is still null via API
      state_message # Should I just parse data out of the message?
      start_time
      end_time
      task {
        id
        name
        slug
        type
        mapped
      }
    }
  }
}
And a fragment of the GraphQL json response that shows the missing
state_result
{
  "data": {
    "flow_run_by_pk": {
      "flow_id": "68a4ba1d-8222-4d64-95c6-a5add2612160",
      "name": "proficient-inchworm",
      "state": "Failed",
      "task_runs": [
        {
          "state": "Failed",
          "state_message": "WARNING: NY scraper does not currently work. Code most be updated. No actions will be performed.",
          "state_result": null,
          "start_time": "2021-07-01T15:12:45.03955+00:00",
          "end_time": "2021-07-01T15:12:45.348463+00:00",
          "task": {
            "id": "09dfd568-2be6-48ef-9fc4-ab8269441bab",
            "name": "scrape_state",
            "slug": "scrape_state-1",
            "type": "prefect.tasks.core.function.FunctionTask",
            "mapped": true
          }
        },
k

Kevin Kho

07/01/2021, 3:30 PM
I’ll read this whole thread
s

Serdar Tumgoren

07/01/2021, 3:32 PM
@Kevin Kho OK. Greatly appreciated. Shout back if the flow code would be helpful and I can send that as well
k

Kevin Kho

07/01/2021, 3:35 PM
Ok I’ll check with the team is state_result is expected to be NULL. Seems like the other fields updated already so it’s not that delay in writing to the database.
s

Serdar Tumgoren

07/01/2021, 3:36 PM
Ok. Thanks to you and the team for investigating. Really appreciate it!
k

Kevin Kho

07/01/2021, 3:37 PM
Of course!
Yes it’s expected to be
null
. It might only be set if you use a Prefect result. I guess you’ll have to use the
state_message
. By the way, terminal state handler is something we’ll be revisiting because there are a lot of people who have run into similar situations as you and the experience has been unsatisfactory.
Also, this whole process might be A LOT easier with the 0.15.0 release today because there will be an interface so you don’t need to perform that query.
s

Serdar Tumgoren

07/01/2021, 3:44 PM
Wow, I guess I pinged at the right time! Maybe i’ll hold off on this until after I upgrade. But fwiw, this is also slightly an edge-casey nice-to-have on my part. Ultimately, i think it’d be fine for me to POST a slack alert saying a flow run failed (which I already have working), and then direct the user to Cloud UI to explore the details via logging panel. I was just trying to get a bit fancier with my Slack alert, which led me down this rabbit hole! 🙂 Anyhow, I’ll checkout 0.15.0 when it drops. Meantime, really appreciate you all being so responsive and please do pass along that so far I’m loving Prefect!
k

Kevin Kho

07/01/2021, 3:47 PM
Of course! Thanks for your patience also!
s

Serdar Tumgoren

07/01/2021, 10:00 PM
@Kevin Kho Awesome. Thanks for the heads up!
Hey @Kevin Kho apologies for reviving this old thread but I’m having some trouble using the new FlowRunView to accomplish the task outlined here. I was under the assumption that the combination of FlowRunView and TaskRunViews would enable me to accomplish what seems like a very simple goal: Create a slack alert with metadata from failed tasks once a flow run has finished. I’ve attempted to use FlowRunView in both state-handlers and using a basic (non-state handler) task that checks for the finished state of a flow using FlowRunView. But no matter what I try, the FlowRunView never seems to return in a finished state. I can share code if that’s helpful, but wanted to first outline the problem in case I’m somehow misunderstanding/misusing the FlowRunView. Any advice would be greatly appreciated!
k

Kevin Kho

07/09/2021, 8:52 PM
I wouldn’t expect the output of the FlowRunView to be finished while the flow is still running (even at the state handler). Are you using this in the same flow run while grabbing the id from the context? The behavior for TaskRunView would be better. How are your TaskRunViews and can you show me your code?
s

Serdar Tumgoren

07/09/2021, 8:53 PM
Aha, ok, it sounds like i’m definitely using the flow run view incorrectly (or have the wrong expectations about it’s state). 1 sec, I’ll clean up the file and show the latest approach
Ok, here’s the cleaned up version of latest attempt with a TODO note marking what I’m up to
Please pardon the cruft (e.g. unused imports from earlier unsuccessful attempts)
I’m wondering if this somehow needs to be run outside the flow’s context manager. But that seems to create a chicken-egg problem related to using FlowRunView (i.e. i wasn’t able to access the flow run ID to instantiate FlowRunView using prefect.context outside the “with” statement)
@Kevin Kho oh wait, is the current issue that FlowRunView is declared in a task used inside the flow context manager? Which technically would always still be running inside the flow context?
k

Kevin Kho

07/09/2021, 9:01 PM
Gotcha, I think this final notification should not be a task. It should be a
state handler
, and then the state handler will be attached to the flow. The
FlowRunView
will always be in a running state as the flow is running. The state handler will be triggered right before the flow is finishing, and the
state handler
will be responsible for the
TaskRunView
to get that info.
I need a bit of time to make a minimal example, but this is a similar example: https://github.com/PrefectHQ/prefect/discussions/4750
s

Serdar Tumgoren

07/09/2021, 9:03 PM
@Kevin Kho Aha. Ok, that makes sense. I tried the state handler approach with FlowRunView but it sounds like TaskRunView is how I should do approach this. I’ll check out the example you sent and really appreciate any more minimal example you can pass along. Thanks!
k

Kevin Kho

07/09/2021, 9:06 PM
Is the purpose of this just when something fails, you want to figure out which one it was with a Slack message?
s

Serdar Tumgoren

07/09/2021, 9:08 PM
End goal is to be able to send a Slack alert when the run is completed. This is for a project that scrapes data from 50 US States. I’d like my alert message to contain a list of postal codes for the state scrapers that succeeded and failed. I’m trying to avoid slack alerts for each state scraper b/c the logging in Slack is too verbose. My goal is to somehow examine the state of all mapped tasks at end of the run. Then use the metadata from the succeeded results and metadata attached to failed signals to compile a single slack alert message.
(apologies for the overloading of the word “state” btw; i happen to be scraping US state data, and I realize this could get confusing fast…)
k

Kevin Kho

07/09/2021, 9:10 PM
Gotcha ok i’ll try to make something
s

Serdar Tumgoren

07/09/2021, 9:10 PM
You rock. I’ll keep tinkering as well. there are still a few things I haven’t tried, but really appreciate the sanity check and guidance!
@Kevin Kho Hey, it’s Friday and this is not urgent, but wanted to pass along another example that gets close to what I’m aiming for
k

Kevin Kho

07/09/2021, 10:39 PM
I have been running into issues with TaskRunView I’ll need to ask the team about but I have a naive solution for you for now:
# Import Library
import prefect
from prefect import task, Flow
from datetime import timedelta
from prefect.backend.flow_run import FlowRunView, TaskRunView
from typing import Dict
import time

# I need this to hold the task inputs
class AnError(Exception):
    def __init__(self, message): 
        self.message = message
        super().__init__(self.message)
    
# Error Task
@task()
def scrape_state(state):
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>(str(state))
    if state == "C":
        raise AnError(f"Error Scraping {state}")
    else:
        return state

from prefect.triggers import always_run
@task(trigger=always_run)
def something_else(x):
    logger = prefect.context.get("logger")

    # x here will be a list of successes and errors
    successes = [_ for _ in x if not isinstance(_, Exception)]
    errors = [_.message for _ in x if isinstance(_, Exception)]
    errors = [_.split("Error Scraping ")[1] for _ in errors]

    <http://logger.info|logger.info>("successes")
    <http://logger.info|logger.info>(successes)
    <http://logger.info|logger.info>("errors")
    <http://logger.info|logger.info>(errors)
    return x

with Flow("flow_must_fail") as flow:
    states = ["A", "B", "C"]
    x = scrape_state.map(states)
    y = something_else(x, upstream_tasks = [x])

flow.run()
Basically pass the mapped task downstream to another task responsible for pulling out success and failures. Embed the input in the error message so that you can propagate it down to the succeeding task
s

Serdar Tumgoren

07/09/2021, 10:43 PM
@Kevin Kho Aha, ok, that’s very clever! I was about to give up on this b/c it seemed that with my most recent script (that I just posted above), I would have to give up the benefits of full stack trace logging by raising signals in the task that (potentially) produces errors. But your approach seems to give me the best of all worlds. I’ll take it for a test spin this weekend. many thanks!!
k

Kevin Kho

07/09/2021, 10:46 PM
Better version:
# Import Library
import prefect
from prefect import task, Flow
from datetime import timedelta
from prefect.backend.flow_run import FlowRunView, TaskRunView
from typing import Dict
import time

# I need this to hold the task inputs
class AnError(Exception):
    def __init__(self, message, input): 
        self.message = message
        self.input = input
        super().__init__(self.message)
    
# Error Task
@task()
def scrape_state(state):
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>(str(state))
    if state == "C":
        raise AnError(f"Error Scraping", input=state)
    else:
        return state

from prefect.triggers import always_run
@task(trigger=always_run)
def something_else(x):
    logger = prefect.context.get("logger")

    # x here will be a list of successes and errors
    successes = [_ for _ in x if not isinstance(_, Exception)]
    errors = [_.input for _ in x if isinstance(_, Exception)]

    <http://logger.info|logger.info>("successes")
    <http://logger.info|logger.info>(successes)
    <http://logger.info|logger.info>("errors")
    <http://logger.info|logger.info>(errors)
    return x

with Flow("flow_must_fail") as flow:
    states = ["A", "B", "C"]
    x = scrape_state.map(states)
    y = something_else(x, upstream_tasks = [x])

flow.run()
Oh yeah, you can
raise FAIL(message)
and put it in the message
I actually prefer this to the TaskRunView since there’s no need to query the API or anything
s

Serdar Tumgoren

07/09/2021, 11:15 PM
interesting. ok, i’ll try porting this approach to me real use case. Thanks again for all the help and let me know if you hear back any relevant details from the rest of the team about TaskRunViews. Have a great weekend!