Anze Kravanja
07/29/2021, 4:42 PMfor t in flow.tasks:
tr = state.result.get(t, None)
if not tr: continue
<http://flow.logger.info|flow.logger.info>(f"Task name '{t.name}' -> State '{tr.message}' -> Is failed: {tr.is_failed()}")
if tr.is_failed():
err_params['tasks'].append(OrderedDict({
'taskName': t.name,
'errorMessage': tr.message,
'errorParams': tr.result if isinstance(tr.result, dict) else str(tr.result)
}))
Basically just going through all the tasks and checking if they are is_failed is true, if so I am grabbing some info.
This all works as intended locally, but when I package my flows in a docker and run with docker agent, it turns out state.result is an empty dictionary.
While previously locally, I found each tasks result there.
I’ve played with GCSResult and just leaving it to default but in both cases while running in docker the state.result={}.
Any ideas what I might be doing wrong?Kevin Kho
Kevin Kho
Kevin Kho
time.sleep(5)
to allow it to sync.Anze Kravanja
07/29/2021, 5:07 PMAnze Kravanja
07/29/2021, 5:58 PMKevin Kho
Billy McMonagle
07/29/2021, 8:03 PMdef on_failure_post_to_slack(flow, old_state, new_state):
if new_state.is_failed():
msg = f"Flow {flow.name} finished in state {new_state}"
<http://requests.post|requests.post>(SLACK_WEBHOOK_URL, json={"text": msg})
Billy McMonagle
07/29/2021, 8:04 PMKevin Kho
Kevin Kho
task.result
.
# Import Library
import prefect
from prefect import task, Flow, Parameter
from typing import Dict
from prefect.engine.results import GCSResult
class AnotherError(Exception):
def __init__(self, message, inputs: Dict):
self.message = message
self.inputs = inputs
super().__init__(self.message)
def myStateHandler(task, old_state, new_state):
logger = prefect.context.get("logger")
if new_state.is_finished():
<http://logger.info|logger.info>("Task Result")
<http://logger.info|logger.info>(task.result)
<http://logger.info|logger.info>(task.result.location)
def onFailureTask(task, state):
logger = prefect.context.get("logger")
# Task Variables
<http://logger.info|logger.info>("Task inputs")
<http://logger.info|logger.info>(state)
<http://logger.info|logger.info>(state.result.inputs)
# Parameters
<http://logger.info|logger.info>("Parameters")
<http://logger.info|logger.info>(prefect.context.get("parameters", {}))
<http://logger.info|logger.info>("Task Result")
<http://logger.info|logger.info>(task.result)
@task(state_handlers=[myStateHandler], result=GCSResult(bucket="prefect-kvnkho", location="1.txt"))
def abc(x):
return x
# Error Task
@task(on_failure=onFailureTask, result=GCSResult(bucket="prefect-kvnkho"))
def myTask(str_param):
logger = prefect.context.get("logger")
<http://logger.info|logger.info>(str(str_param))
raise(AnotherError(f"Error Message Is Here", inputs={'str_param': str_param}))
with Flow("flow_must_fail") as flow:
param1 = Parameter('param1',default="XXX")
a = abc(param1)
b = myTask(a)
flow.run()
Billy McMonagle
07/29/2021, 8:15 PMKevin Kho
Billy McMonagle
07/29/2021, 8:17 PMBilly McMonagle
07/29/2021, 8:17 PM# message from flow level state handler
Flow Query1 finished in state <Failed: "Some reference tasks failed.">
# message from task level state handler
Flow execute finished in state <Failed: "Unable to locate query file /usr/src/queries/sql/query1/query.sql">
Kevin Kho
Billy McMonagle
07/29/2021, 8:19 PMResult
objects yet, but in fact they seem usefulBilly McMonagle
07/29/2021, 8:19 PMKevin Kho
Billy McMonagle
07/29/2021, 8:29 PMKevin Kho
context
thoughBilly McMonagle
07/29/2021, 8:31 PMAnze Kravanja
07/29/2021, 11:46 PMKevin Kho
SUCCESS
because that is also the backbone of flow re-runs. If you re-try from failure, it pulls the result of the upstream task that was persisted to avoided re-running already successful task. I don’t think there is meaning if you load in an error. I think this separates data concerns from orchestration concerns.
Now, for you, I think you can manually do this but it’s a bit tricky. You can use the Result
interface like:
@task
def abc(x):
res = Result()
res.write("x", location=...)
return res.location
@task
def abc2(location):
res = Result().read(location)
This gives you more control over what is persisted as you can force it to persist errors. There is one caveat though that makes this tricky. Results
and paired with Serializers
like the JSONSerializer
and PandasSerializer
. Serializing an exception as a string is probably fine is all your other data is JSONSerialzeable
but you can’t mix and match depending on the type of data you’re getting unless you just use a serializer that does nothing (which you can make in 5 lines of code no problem)