Serdar Tumgoren
07/01/2021, 12:31 AMnew_state.result
inside the state handler is the problem locale. Locally, it is a dict populated with (<task>, <state>) items; on Server or Cloud, new_state.result
appears to return an empty dict.
I’ve created two basic flow scripts to demonstrate this behavior (one for a local manual run, the other for Server or Cloud) and will attach them below. Each contains a NOTE
marking the divergent status of new_state.result
. Would be indebted if someone could advise on what I’m doing wrong!python test_flow.py
issues the expected logging (see toward bottom of script)Amanda Wee
07/01/2021, 1:59 AMraise signals.FAIL(e, result=(x, y))
If I'm reading the code right, this should set the result of the Failed
state associated with the FAIL
signal to (x, y)
, so in the state handler new_state.result
should have it.Serdar Tumgoren
07/01/2021, 2:20 AMAmanda Wee
07/01/2021, 2:21 AMSerdar Tumgoren
07/01/2021, 2:23 AMAmanda Wee
07/01/2021, 2:37 AMclass FlowFailureData:
instance = None
def __init__(self):
self.failures = []
def __str__(self):
output = ""
for failure in self.failures:
output += f"{failure[0]}: {failure[1]}"
return output
def handle_flow_failure_data(flow, old_state, new_state):
logger = prefect.context.get('logger')
flow_name = prefect.context.flow_name
msg = flow_name
if new_state.is_running():
FlowFailureData.instance = FlowFailureData()
elif new_state.is_finished():
if new_state.is_successful():
msg += " ran successfully."
<http://logger.info|logger.info>(msg)
elif FlowFailureData.instance.failures:
msg += " had failures:\n"
msg += str(FlowFailureData.instance)
<http://logger.info|logger.info>(msg)
def notify_on_finish(task, old_state, new_state):
if new_state.is_failed():
FlowFailureData.instance.failures.append((new_state, new_state.result))
@task(state_handlers=[notify_on_finish])
def div(x, y):
try:
return x / y
except Exception as e:
raise signals.FAIL(e, result=(x, y))
with Flow('Test - flow state handler', state_handlers=[handle_flow_failure_data]) as flow:
x_vals = [4, 2]
y_vals = [2, 0]
div.map(x=x_vals, y=y_vals)
#flow.run()
flow.register(project_name="Test flows")
Serdar Tumgoren
07/01/2021, 1:18 PMflow.get_tasks
for example, but the inputs
on the tasks appeared to be Prefect metadata as opposed to the actual values passed in as task arguments. Perhaps I’m missing something there?
I should also mention that I’ll eventually be running this in parallel with the DaskExecutor, so not sure if that poses any potential landmines for the singleton approach. I’ll test that out with the LocalDaskExecutor to verify. Meantime, let me know if you think I can use a simpler inputs
approach. And many many thanks for your help so far!Kevin Kho
07/01/2021, 1:40 PMSerdar Tumgoren
07/01/2021, 1:46 PMKevin Kho
07/01/2021, 1:51 PMSerdar Tumgoren
07/01/2021, 1:53 PMflow_run_by_pk
also does not provide the data I’m attempting to store in the state_result
field when using Cloud. On closer inspection of Zach’s solution, he doesn’t seem to be trying to access this information. I can, however, access the state_message
. Is this expected behavior and, if so, is the simplest solution to just parse the data I need from the state_message
? Here is the GraphQL query I’m using to test (on a failed run that attempted to add task metadata to a Failed state signal):
query {
flow_run_by_pk(id: "4fff3334-8053-47cc-949c-ed449643a800") {
flow_id
name
state
task_runs {
state
state_result # is still null via API
state_message # Should I just parse data out of the message?
start_time
end_time
task {
id
name
slug
type
mapped
}
}
}
}
And a fragment of the GraphQL json response that shows the missing state_result
{
"data": {
"flow_run_by_pk": {
"flow_id": "68a4ba1d-8222-4d64-95c6-a5add2612160",
"name": "proficient-inchworm",
"state": "Failed",
"task_runs": [
{
"state": "Failed",
"state_message": "WARNING: NY scraper does not currently work. Code most be updated. No actions will be performed.",
"state_result": null,
"start_time": "2021-07-01T15:12:45.03955+00:00",
"end_time": "2021-07-01T15:12:45.348463+00:00",
"task": {
"id": "09dfd568-2be6-48ef-9fc4-ab8269441bab",
"name": "scrape_state",
"slug": "scrape_state-1",
"type": "prefect.tasks.core.function.FunctionTask",
"mapped": true
}
},
Kevin Kho
07/01/2021, 3:30 PMSerdar Tumgoren
07/01/2021, 3:32 PMKevin Kho
07/01/2021, 3:35 PMSerdar Tumgoren
07/01/2021, 3:36 PMKevin Kho
07/01/2021, 3:37 PMnull
. It might only be set if you use a Prefect result. I guess you’ll have to use the state_message
. By the way, terminal state handler is something we’ll be revisiting because there are a lot of people who have run into similar situations as you and the experience has been unsatisfactory.Serdar Tumgoren
07/01/2021, 3:44 PMKevin Kho
07/01/2021, 3:47 PMSerdar Tumgoren
07/01/2021, 10:00 PMKevin Kho
07/09/2021, 8:52 PMSerdar Tumgoren
07/09/2021, 8:53 PMKevin Kho
07/09/2021, 9:01 PMstate handler
, and then the state handler will be attached to the flow. The FlowRunView
will always be in a running state as the flow is running. The state handler will be triggered right before the flow is finishing, and the state handler
will be responsible for the TaskRunView
to get that info.Serdar Tumgoren
07/09/2021, 9:03 PMKevin Kho
07/09/2021, 9:06 PMSerdar Tumgoren
07/09/2021, 9:08 PMKevin Kho
07/09/2021, 9:10 PMSerdar Tumgoren
07/09/2021, 9:10 PMKevin Kho
07/09/2021, 10:39 PM# Import Library
import prefect
from prefect import task, Flow
from datetime import timedelta
from prefect.backend.flow_run import FlowRunView, TaskRunView
from typing import Dict
import time
# I need this to hold the task inputs
class AnError(Exception):
def __init__(self, message):
self.message = message
super().__init__(self.message)
# Error Task
@task()
def scrape_state(state):
logger = prefect.context.get("logger")
<http://logger.info|logger.info>(str(state))
if state == "C":
raise AnError(f"Error Scraping {state}")
else:
return state
from prefect.triggers import always_run
@task(trigger=always_run)
def something_else(x):
logger = prefect.context.get("logger")
# x here will be a list of successes and errors
successes = [_ for _ in x if not isinstance(_, Exception)]
errors = [_.message for _ in x if isinstance(_, Exception)]
errors = [_.split("Error Scraping ")[1] for _ in errors]
<http://logger.info|logger.info>("successes")
<http://logger.info|logger.info>(successes)
<http://logger.info|logger.info>("errors")
<http://logger.info|logger.info>(errors)
return x
with Flow("flow_must_fail") as flow:
states = ["A", "B", "C"]
x = scrape_state.map(states)
y = something_else(x, upstream_tasks = [x])
flow.run()
Serdar Tumgoren
07/09/2021, 10:43 PMKevin Kho
07/09/2021, 10:46 PM# Import Library
import prefect
from prefect import task, Flow
from datetime import timedelta
from prefect.backend.flow_run import FlowRunView, TaskRunView
from typing import Dict
import time
# I need this to hold the task inputs
class AnError(Exception):
def __init__(self, message, input):
self.message = message
self.input = input
super().__init__(self.message)
# Error Task
@task()
def scrape_state(state):
logger = prefect.context.get("logger")
<http://logger.info|logger.info>(str(state))
if state == "C":
raise AnError(f"Error Scraping", input=state)
else:
return state
from prefect.triggers import always_run
@task(trigger=always_run)
def something_else(x):
logger = prefect.context.get("logger")
# x here will be a list of successes and errors
successes = [_ for _ in x if not isinstance(_, Exception)]
errors = [_.input for _ in x if isinstance(_, Exception)]
<http://logger.info|logger.info>("successes")
<http://logger.info|logger.info>(successes)
<http://logger.info|logger.info>("errors")
<http://logger.info|logger.info>(errors)
return x
with Flow("flow_must_fail") as flow:
states = ["A", "B", "C"]
x = scrape_state.map(states)
y = something_else(x, upstream_tasks = [x])
flow.run()
raise FAIL(message)
and put it in the messageSerdar Tumgoren
07/09/2021, 11:15 PM