Hi everyone — I have a mapped task and in case one...
# ask-community
b
Hi everyone — I have a mapped task and in case one of the “entries” fail, I want my handler to know what were the arguments for the one that failed, is there any way to get that? I already get the error message for each mapped task via the “map_states” attribute
k
Hey @Bruno Murino, you need to attach it to the Error to get it in the state handler. You can pull it out of the message, or you could do something like this:
Copy code
# Import Library
import prefect
from prefect import task, Flow, Parameter
from datetime import timedelta
from prefect.backend import FlowView
from prefect.backend.flow_run import FlowRunView
from typing import Dict

class RaiseErrorClass(Exception):
    pass

class AnotherError(RaiseErrorClass):
    def __init__(self, message, inputs: Dict): 
        self.message = message
        self.inputs = inputs
        super().__init__(self.message)


def onFailureTask(task, state):
    logger = prefect.context.get("logger")

    # Project name
    flow_id = prefect.context.get("flow_id")
    flow_view = FlowView.from_flow_id(flow_id)
    <http://logger.info|logger.info>(flow_view)
    <http://logger.info|logger.info>("Project name")
    <http://logger.info|logger.info>(flow_view.project_name)

    # Task Variables
    <http://logger.info|logger.info>("Task inputs")
    <http://logger.info|logger.info>(state)
    <http://logger.info|logger.info>(state.result.inputs)

    # Flow stuff
    <http://logger.info|logger.info>(str(prefect.context.get("flow_name")))
    <http://logger.info|logger.info>(str(prefect.context.get("flow_id")))
    <http://logger.info|logger.info>(str(prefect.context.get("flow_run_id")))
    <http://logger.info|logger.info>(str(prefect.context.get("flow_run_version")))
    <http://logger.info|logger.info>(str(prefect.context.get("flow_run_name")))

    # Parameters
    <http://logger.info|logger.info>("Parameters")
    <http://logger.info|logger.info>(prefect.context.get("parameters", {}))
    
# Error Task
@task(on_failure=onFailureTask)
def myTask(str_param):
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>(str(str_param))
    raise(AnotherError(f"Error Message Is Here balabala", inputs={'str_param': str_param}))

with Flow("flow_must_fail") as flow:
    param1 = Parameter('param1',default="XXX")
    myTask(param1)
b
Hi @Kevin Kho, this is useful! I think I’ll need to have try-catch block to catch all exceptions, add the extra data to it, then raise it again so that I retain the original error thing — I’ll have a play with this! thanks!