Alfie
07/27/2021, 12:03 AMKevin Kho
.result.location and then load it. If it’s something like an input, you have to do something like this:
Edited: wrong code snippet looking for the right oneKevin Kho
# Import Library
import prefect
from prefect import task, Flow, Parameter
from typing import Dict
from prefect.engine.results import GCSResult
class AnotherError(Exception):
    def __init__(self, message, inputs: Dict): 
        self.message = message
        self.inputs = inputs
        super().__init__(self.message)
def myStateHandler(task, old_state, new_state):
    logger = prefect.context.get("logger")
    if new_state.is_finished():
        <http://logger.info|logger.info>("Task Result")
        <http://logger.info|logger.info>(task.result)
        <http://logger.info|logger.info>(task.result.location)
def onFailureTask(task, state):
    logger = prefect.context.get("logger")
    # Task Variables
    <http://logger.info|logger.info>("Task inputs")
    <http://logger.info|logger.info>(state)
    <http://logger.info|logger.info>(state.result.inputs)
    # Parameters
    <http://logger.info|logger.info>("Parameters")
    <http://logger.info|logger.info>(prefect.context.get("parameters", {}))
    <http://logger.info|logger.info>("Task Result")
    <http://logger.info|logger.info>(task.result)
@task(state_handlers=[myStateHandler], result=GCSResult(bucket="prefect-kvnkho", location="1.txt"))
def abc(x):
    return x
# Error Task
@task(on_failure=onFailureTask, result=GCSResult(bucket="prefect-kvnkho"))
def myTask(str_param):
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>(str(str_param))
    raise(AnotherError(f"Error Message Is Here", inputs={'str_param': str_param}))
with Flow("flow_must_fail") as flow:
    param1 = Parameter('param1',default="XXX")
    a = abc(param1)
    b = myTask(a)
flow.run()Kevin Kho
# Import Library
import prefect
from prefect import task, Flow, Parameter
from typing import Dict
from prefect.engine.results import GCSResult
class AnotherError(Exception):
    def __init__(self, message, inputs: Dict): 
        self.message = message
        self.inputs = inputs
        super().__init__(self.message)
def onFailureTask(task, old_state, new_state):
    if new_state.is_failed():
        logger = prefect.context.get("logger")
        # Task Variables
        <http://logger.info|logger.info>("Task inputs")
        <http://logger.info|logger.info>(new_state)
        <http://logger.info|logger.info>(new_state.result.inputs)
        # Parameters
        <http://logger.info|logger.info>("Parameters")
        <http://logger.info|logger.info>(prefect.context.get("parameters", {}))
        <http://logger.info|logger.info>("Task Result")
        <http://logger.info|logger.info>(task.result)
# Error Task
@task(state_handlers=[onFailureTask])
def myTask(str_param):
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>(str(str_param))
    raise(AnotherError(f"Error Message Is Here", inputs={'str_param': str_param}))
with Flow("flow_must_fail") as flow:
    param1 = Parameter('param1',default="XXX")
    a = myTask(param1)Alfie
07/27/2021, 12:24 AMKevin Kho
TaskRunView as seen here . If you want to stick to 0.13.0, I think you need to query the GraphQL API manually at the flow state handler.Kevin Kho
Alfie
07/27/2021, 12:30 AMKevin Kho
Alfie
07/27/2021, 12:34 AM