Bruno Murino
07/21/2021, 1:34 PMKevin Kho
# Import Library
import prefect
from prefect import task, Flow, Parameter
from datetime import timedelta
from prefect.backend import FlowView
from prefect.backend.flow_run import FlowRunView
from typing import Dict
class RaiseErrorClass(Exception):
pass
class AnotherError(RaiseErrorClass):
def __init__(self, message, inputs: Dict):
self.message = message
self.inputs = inputs
super().__init__(self.message)
def onFailureTask(task, state):
logger = prefect.context.get("logger")
# Project name
flow_id = prefect.context.get("flow_id")
flow_view = FlowView.from_flow_id(flow_id)
<http://logger.info|logger.info>(flow_view)
<http://logger.info|logger.info>("Project name")
<http://logger.info|logger.info>(flow_view.project_name)
# Task Variables
<http://logger.info|logger.info>("Task inputs")
<http://logger.info|logger.info>(state)
<http://logger.info|logger.info>(state.result.inputs)
# Flow stuff
<http://logger.info|logger.info>(str(prefect.context.get("flow_name")))
<http://logger.info|logger.info>(str(prefect.context.get("flow_id")))
<http://logger.info|logger.info>(str(prefect.context.get("flow_run_id")))
<http://logger.info|logger.info>(str(prefect.context.get("flow_run_version")))
<http://logger.info|logger.info>(str(prefect.context.get("flow_run_name")))
# Parameters
<http://logger.info|logger.info>("Parameters")
<http://logger.info|logger.info>(prefect.context.get("parameters", {}))
# Error Task
@task(on_failure=onFailureTask)
def myTask(str_param):
logger = prefect.context.get("logger")
<http://logger.info|logger.info>(str(str_param))
raise(AnotherError(f"Error Message Is Here balabala", inputs={'str_param': str_param}))
with Flow("flow_must_fail") as flow:
param1 = Parameter('param1',default="XXX")
myTask(param1)
Bruno Murino
07/21/2021, 2:02 PM