Hugo Shi
07/19/2021, 6:19 PMKevin Kho
Hugo Shi
07/19/2021, 6:24 PMHugo Shi
07/19/2021, 6:24 PMHugo Shi
07/19/2021, 6:25 PMtask.run
Hugo Shi
07/19/2021, 6:26 PMKevin Kho
Restart
would not work for you right?Hugo Shi
07/19/2021, 6:28 PMKevin Kho
state handler
is hit, you can send this info through Slack or you can write it out. What do you think of this?Kevin Kho
flow.run()
call.Hugo Shi
07/19/2021, 6:31 PMKevin Kho
Kevin Kho
Hugo Shi
07/19/2021, 11:10 PMKevin Kho
# Import Library
import prefect
from prefect import task, Flow, Parameter
from typing import Dict
from prefect.engine.results import GCSResult
class AnotherError(Exception):
def __init__(self, message, inputs: Dict):
self.message = message
self.inputs = inputs
super().__init__(self.message)
def myStateHandler(task, old_state, new_state):
logger = prefect.context.get("logger")
if new_state.is_finished():
<http://logger.info|logger.info>("Task Result")
<http://logger.info|logger.info>(task.result)
<http://logger.info|logger.info>(task.result.location)
def onFailureTask(task, state):
logger = prefect.context.get("logger")
# Task Variables
<http://logger.info|logger.info>("Task inputs")
<http://logger.info|logger.info>(state)
<http://logger.info|logger.info>(state.result.inputs)
# Parameters
<http://logger.info|logger.info>("Parameters")
<http://logger.info|logger.info>(prefect.context.get("parameters", {}))
<http://logger.info|logger.info>("Task Result")
<http://logger.info|logger.info>(task.result)
@task(state_handlers=[myStateHandler], result=GCSResult(bucket="prefect-kvnkho", location="1.txt"))
def abc(x):
return x
# Error Task
@task(on_failure=onFailureTask, result=GCSResult(bucket="prefect-kvnkho"))
def myTask(str_param):
logger = prefect.context.get("logger")
<http://logger.info|logger.info>(str(str_param))
raise(AnotherError(f"Error Message Is Here", inputs={'str_param': str_param}))
with Flow("flow_must_fail") as flow:
param1 = Parameter('param1',default="XXX")
a = abc(param1)
b = myTask(a)
flow.run()
Hugo Shi
07/20/2021, 5:46 PMdef state_handler(task, old_state, new_state):
logger = prefect.context.get("logger")
if new_state.is_failed() or new_state.is_finished():
<http://logger.info|logger.info>(f"State: {new_state}")
<http://logger.info|logger.info>(new_state)
<http://logger.info|logger.info>(new_state.result.inputs)
<http://logger.info|logger.info>("Parameters")
<http://logger.info|logger.info>(prefect.context.get("parameters", {}))
<http://logger.info|logger.info>("Task Result")
<http://logger.info|logger.info>(task.result)
<http://logger.info|logger.info>(task.result.location)
Hugo Shi
07/20/2021, 5:47 PMAttributeError: 'datetime.datetime' object has no attribute 'inputs
Kevin Kho
result.inputs
is the input
attribute of the AnotherError
class I used. I suppose you could use the inputs
if new_state.is_failed()
and you can use result.value
if new_state.is_finished()
?Hugo Shi
07/20/2021, 5:48 PMHugo Shi
07/20/2021, 5:50 PMKevin Kho
Kevin Kho
Hugo Shi
07/21/2021, 3:29 AMKevin Kho
Hugo Shi
07/24/2021, 4:01 PM