Alfie

    Alfie

    1 year ago
    Hi Team, now I’m using (Prefect core 0.13) flow.state_handlers to do some actions once the state is changed, and the call back method takes three params: flow, old_state, new_state. What I want is to access some data generated during the flow execution in the handler and it’s used to assistant the triggered action. Any suggestions about how can I make the data accessible in the handler? Thanks
    Kevin Kho

    Kevin Kho

    1 year ago
    Hey @Alfie, you need to somehow attach it to the result. If it’s thing returned, you can use the
    .result.location
    and then load it. If it’s something like an input, you have to do something like this: Edited: wrong code snippet looking for the right one
    Right code snippet:
    # Import Library
    import prefect
    from prefect import task, Flow, Parameter
    from typing import Dict
    from prefect.engine.results import GCSResult
    class AnotherError(Exception):
        def __init__(self, message, inputs: Dict): 
            self.message = message
            self.inputs = inputs
            super().__init__(self.message)
    def myStateHandler(task, old_state, new_state):
        logger = prefect.context.get("logger")
        if new_state.is_finished():
            <http://logger.info|logger.info>("Task Result")
            <http://logger.info|logger.info>(task.result)
            <http://logger.info|logger.info>(task.result.location)
    def onFailureTask(task, state):
        logger = prefect.context.get("logger")
        # Task Variables
        <http://logger.info|logger.info>("Task inputs")
        <http://logger.info|logger.info>(state)
        <http://logger.info|logger.info>(state.result.inputs)
        # Parameters
        <http://logger.info|logger.info>("Parameters")
        <http://logger.info|logger.info>(prefect.context.get("parameters", {}))
        <http://logger.info|logger.info>("Task Result")
        <http://logger.info|logger.info>(task.result)
    @task(state_handlers=[myStateHandler], result=GCSResult(bucket="prefect-kvnkho", location="1.txt"))
    def abc(x):
        return x
    # Error Task
    @task(on_failure=onFailureTask, result=GCSResult(bucket="prefect-kvnkho"))
    def myTask(str_param):
        logger = prefect.context.get("logger")
        <http://logger.info|logger.info>(str(str_param))
        raise(AnotherError(f"Error Message Is Here", inputs={'str_param': str_param}))
    with Flow("flow_must_fail") as flow:
        param1 = Parameter('param1',default="XXX")
        a = abc(param1)
        b = myTask(a)
    flow.run()
    I condensed it. Sorry the previous one had too much fluff:
    # Import Library
    import prefect
    from prefect import task, Flow, Parameter
    from typing import Dict
    from prefect.engine.results import GCSResult
    class AnotherError(Exception):
        def __init__(self, message, inputs: Dict): 
            self.message = message
            self.inputs = inputs
            super().__init__(self.message)
    
    def onFailureTask(task, old_state, new_state):
        if new_state.is_failed():
            logger = prefect.context.get("logger")
            # Task Variables
            <http://logger.info|logger.info>("Task inputs")
            <http://logger.info|logger.info>(new_state)
            <http://logger.info|logger.info>(new_state.result.inputs)
    
            # Parameters
            <http://logger.info|logger.info>("Parameters")
            <http://logger.info|logger.info>(prefect.context.get("parameters", {}))
            <http://logger.info|logger.info>("Task Result")
            <http://logger.info|logger.info>(task.result)
    
    # Error Task
    @task(state_handlers=[onFailureTask])
    def myTask(str_param):
        logger = prefect.context.get("logger")
        <http://logger.info|logger.info>(str(str_param))
        raise(AnotherError(f"Error Message Is Here", inputs={'str_param': str_param}))
    
    with Flow("flow_must_fail") as flow:
        param1 = Parameter('param1',default="XXX")
        a = myTask(param1)
    Alfie

    Alfie

    1 year ago
    Thanks @Kevin Kho! I want to access the data in a flow’s state handler. For example, in one task the data is initialized, and it’s used when the flow is ended. Which way you think is better?
    Kevin Kho

    Kevin Kho

    1 year ago
    This was a known painpoint so we addressed it in 0.15.0 with
    TaskRunView
    as seen here . If you want to stick to 0.13.0, I think you need to query the GraphQL API manually at the flow state handler.
    See this for querying and then do that inside the state handler.
    Alfie

    Alfie

    1 year ago
    Cool, let me try it. Thanks!
    Kevin Kho

    Kevin Kho

    1 year ago
    Maybe for you it’s a matter of adding state_result and state into the query? Not sure if that gets you what you want.
    Alfie

    Alfie

    1 year ago
    I think so, got your point. Thanks