Alfie
07/27/2021, 12:03 AMKevin Kho
07/27/2021, 12:06 AM.result.location
and then load it. If it’s something like an input, you have to do something like this:
Edited: wrong code snippet looking for the right one# Import Library
import prefect
from prefect import task, Flow, Parameter
from typing import Dict
from prefect.engine.results import GCSResult
class AnotherError(Exception):
def __init__(self, message, inputs: Dict):
self.message = message
self.inputs = inputs
super().__init__(self.message)
def myStateHandler(task, old_state, new_state):
logger = prefect.context.get("logger")
if new_state.is_finished():
<http://logger.info|logger.info>("Task Result")
<http://logger.info|logger.info>(task.result)
<http://logger.info|logger.info>(task.result.location)
def onFailureTask(task, state):
logger = prefect.context.get("logger")
# Task Variables
<http://logger.info|logger.info>("Task inputs")
<http://logger.info|logger.info>(state)
<http://logger.info|logger.info>(state.result.inputs)
# Parameters
<http://logger.info|logger.info>("Parameters")
<http://logger.info|logger.info>(prefect.context.get("parameters", {}))
<http://logger.info|logger.info>("Task Result")
<http://logger.info|logger.info>(task.result)
@task(state_handlers=[myStateHandler], result=GCSResult(bucket="prefect-kvnkho", location="1.txt"))
def abc(x):
return x
# Error Task
@task(on_failure=onFailureTask, result=GCSResult(bucket="prefect-kvnkho"))
def myTask(str_param):
logger = prefect.context.get("logger")
<http://logger.info|logger.info>(str(str_param))
raise(AnotherError(f"Error Message Is Here", inputs={'str_param': str_param}))
with Flow("flow_must_fail") as flow:
param1 = Parameter('param1',default="XXX")
a = abc(param1)
b = myTask(a)
flow.run()
# Import Library
import prefect
from prefect import task, Flow, Parameter
from typing import Dict
from prefect.engine.results import GCSResult
class AnotherError(Exception):
def __init__(self, message, inputs: Dict):
self.message = message
self.inputs = inputs
super().__init__(self.message)
def onFailureTask(task, old_state, new_state):
if new_state.is_failed():
logger = prefect.context.get("logger")
# Task Variables
<http://logger.info|logger.info>("Task inputs")
<http://logger.info|logger.info>(new_state)
<http://logger.info|logger.info>(new_state.result.inputs)
# Parameters
<http://logger.info|logger.info>("Parameters")
<http://logger.info|logger.info>(prefect.context.get("parameters", {}))
<http://logger.info|logger.info>("Task Result")
<http://logger.info|logger.info>(task.result)
# Error Task
@task(state_handlers=[onFailureTask])
def myTask(str_param):
logger = prefect.context.get("logger")
<http://logger.info|logger.info>(str(str_param))
raise(AnotherError(f"Error Message Is Here", inputs={'str_param': str_param}))
with Flow("flow_must_fail") as flow:
param1 = Parameter('param1',default="XXX")
a = myTask(param1)
Alfie
07/27/2021, 12:24 AMKevin Kho
07/27/2021, 12:26 AMTaskRunView
as seen here . If you want to stick to 0.13.0, I think you need to query the GraphQL API manually at the flow state handler.Alfie
07/27/2021, 12:30 AMKevin Kho
07/27/2021, 12:32 AMAlfie
07/27/2021, 12:34 AM