Hi Team, now I’m using (Prefect core 0.13) flow.st...
# prefect-server
a
Hi Team, now I’m using (Prefect core 0.13) flow.state_handlers to do some actions once the state is changed, and the call back method takes three params: flow, old_state, new_state. What I want is to access some data generated during the flow execution in the handler and it’s used to assistant the triggered action. Any suggestions about how can I make the data accessible in the handler? Thanks
k
Hey @Alfie, you need to somehow attach it to the result. If it’s thing returned, you can use the
.result.location
and then load it. If it’s something like an input, you have to do something like this: Edited: wrong code snippet looking for the right one
Right code snippet:
Copy code
# Import Library
import prefect
from prefect import task, Flow, Parameter
from typing import Dict
from prefect.engine.results import GCSResult
class AnotherError(Exception):
    def __init__(self, message, inputs: Dict): 
        self.message = message
        self.inputs = inputs
        super().__init__(self.message)
def myStateHandler(task, old_state, new_state):
    logger = prefect.context.get("logger")
    if new_state.is_finished():
        <http://logger.info|logger.info>("Task Result")
        <http://logger.info|logger.info>(task.result)
        <http://logger.info|logger.info>(task.result.location)
def onFailureTask(task, state):
    logger = prefect.context.get("logger")
    # Task Variables
    <http://logger.info|logger.info>("Task inputs")
    <http://logger.info|logger.info>(state)
    <http://logger.info|logger.info>(state.result.inputs)
    # Parameters
    <http://logger.info|logger.info>("Parameters")
    <http://logger.info|logger.info>(prefect.context.get("parameters", {}))
    <http://logger.info|logger.info>("Task Result")
    <http://logger.info|logger.info>(task.result)
@task(state_handlers=[myStateHandler], result=GCSResult(bucket="prefect-kvnkho", location="1.txt"))
def abc(x):
    return x
# Error Task
@task(on_failure=onFailureTask, result=GCSResult(bucket="prefect-kvnkho"))
def myTask(str_param):
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>(str(str_param))
    raise(AnotherError(f"Error Message Is Here", inputs={'str_param': str_param}))
with Flow("flow_must_fail") as flow:
    param1 = Parameter('param1',default="XXX")
    a = abc(param1)
    b = myTask(a)
flow.run()
I condensed it. Sorry the previous one had too much fluff:
Copy code
# Import Library
import prefect
from prefect import task, Flow, Parameter
from typing import Dict
from prefect.engine.results import GCSResult
class AnotherError(Exception):
    def __init__(self, message, inputs: Dict): 
        self.message = message
        self.inputs = inputs
        super().__init__(self.message)

def onFailureTask(task, old_state, new_state):
    if new_state.is_failed():
        logger = prefect.context.get("logger")
        # Task Variables
        <http://logger.info|logger.info>("Task inputs")
        <http://logger.info|logger.info>(new_state)
        <http://logger.info|logger.info>(new_state.result.inputs)

        # Parameters
        <http://logger.info|logger.info>("Parameters")
        <http://logger.info|logger.info>(prefect.context.get("parameters", {}))
        <http://logger.info|logger.info>("Task Result")
        <http://logger.info|logger.info>(task.result)

# Error Task
@task(state_handlers=[onFailureTask])
def myTask(str_param):
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>(str(str_param))
    raise(AnotherError(f"Error Message Is Here", inputs={'str_param': str_param}))

with Flow("flow_must_fail") as flow:
    param1 = Parameter('param1',default="XXX")
    a = myTask(param1)
a
Thanks @Kevin Kho! I want to access the data in a flow’s state handler. For example, in one task the data is initialized, and it’s used when the flow is ended. Which way you think is better?
k
This was a known painpoint so we addressed it in 0.15.0 with
TaskRunView
as seen here . If you want to stick to 0.13.0, I think you need to query the GraphQL API manually at the flow state handler.
See this for querying and then do that inside the state handler.
a
Cool, let me try it. Thanks!
k
Maybe for you it’s a matter of adding state_result and state into the query? Not sure if that gets you what you want.
a
I think so, got your point. Thanks