Billy McMonagle
02/17/2022, 9:50 PMdo_something
task to the cleanup_on_failure
task. What is the correct way to do this?
import random
from prefect.triggers import all_failed
from prefect import task, Flow
@task(name="Do Something")
def do_something():
if random.random() > 0.5:
raise ValueError("Non-deterministic error has occured.")
@task(name="Cleanup Task", trigger=all_failed)
def cleanup_on_failure():
pass
with Flow("Trigger example") as flow:
fail = cleanup_on_failure(upstream_tasks=[do_something])
flow.set_reference_tasks([do_something])
Anna Geller
import random
from prefect.triggers import all_failed
from prefect import task, Flow
@task(name="Do Something")
def do_something(x):
if random.random() > 0.5:
raise ValueError(f"Non-deterministic error has occured. Got value {x}")
return x
@task(name="Cleanup Task", trigger=all_failed, log_stdout=True)
def cleanup_on_failure(x):
print(f"Cleaning up. Got value {x}")
with Flow("Trigger example") as flow:
sth = do_something(2)
fail = cleanup_on_failure(sth)
flow.set_reference_tasks([sth])
if __name__ == "__main__":
flow.run()
The above flow will pass the exception message "Got value Non-deterministic error has occured. Got value 2" to the cleanup task, when it fails.
The follow up question: yes, it's possible to pass data to downstream task even it the task fails, but this data will be an exception message - if you need that e.g. to pass the reason why a task failed to a failure notification task, this should workBilly McMonagle
02/17/2022, 10:15 PMFailed
state and add a custom result attribute to it? Since the exception would be passed as the state.message
if I understand correctly.Anna Geller
@task
def notify_on_all_failures():
pass
with Flow() as flow:
t1 = task1()
t2 = task2()
notify_on_all_failures(upstream_tasks=[t1, t2], trigger=any_failed)
This way, if any of those upstream tasks fails, you will send a single notification on failure.Billy McMonagle
02/17/2022, 10:38 PMAnna Geller
team_slug = "anna-prefect" # replace by your team-slug
run_id = prefect.context.get("flow_run_id")
url = f"<https://cloud.prefect.io/{team_slug}/flow-run/{run_id}>"
Billy McMonagle
02/17/2022, 10:48 PMAnna Geller
notify_on_all_failures
task. Definitely possible using FlowRunView and TaskRunView, but involves a bit more workBilly McMonagle
02/17/2022, 11:00 PMdef is_flow(obj):
return isinstance(obj, prefect.Flow)
def is_task(obj):
return isinstance(obj, prefect.Task)
def get_cloud_url(tracked_obj: TrackedObjectType):
"""
Return contextually appropriate cloud URL
"""
if flow_run_id := prefect.context.get("flow_run_id"):
if is_flow(tracked_obj):
return prefect.client.Client().get_cloud_url("flow-run", flow_run_id)
elif is_task(tracked_obj):
return prefect.client.Client().get_cloud_url(
"task-run", prefect.context.get("task_run_id", "")
)
def construct_exception_blocks(
tracked_obj: TrackedObjectType, state: "prefect.engine.state.State",
):
"""
Construct blocks to be appended to main message with contextual error info.
This function can handle flows or tasks in a failed state. Every failed task
within a failed flow will be appended as a separate block.
Blocks can hold up to 3000 characters.
Args:
tracked_obj: (prefect.Flow, prefect.Task) Object containing error information.
state: (prefect.engine.state.State) State containing error information.
"""
blocks = []
try:
if is_task(tracked_obj):
exception_block = {
"type": "section",
"text": {
"type": "mrkdwn",
"text": f"```{repr(state.result)[:2990]}```", # stay under block limit of 3000 chars
},
}
blocks.append(exception_block)
elif is_flow(tracked_obj):
for result in state.result.values():
if result.is_failed():
exception_block = {
"type": "section",
"text": {
"type": "mrkdwn",
"text": f"```{result.message[:2990]}```", # stay under block limit of 3000 chars
},
}
blocks.append(exception_block)
except:
blocks.append(
{
"type": "section",
"text": {
"type": "plain_text",
"text": "Unable to construct exception block, see logs for details.",
},
}
)
return blocks
def construct_parameter_block():
"""
Return block with pretty printed json of parameters, if any.
"""
if parameters := prefect.context.get("parameters", {}):
return {
"type": "section",
"text": {
"type": "mrkdwn",
"text": f"*Parameters*:\n```{json.dumps(parameters, indent=2, sort_keys=True)}```",
},
}
def construct_message_block(mrkdwn_text):
"""
Return block containing an arbitrary messaage
Params
mrkdwn_text: markdown-formatted text string
For reference, see: <https://api.slack.com/messaging/composing/formatting>
"""
return {"type": "section", "text": {"type": "mrkdwn", "text": mrkdwn_text}}
def construct_payload(
tracked_obj: TrackedObjectType,
state: "prefect.engine.state.State",
custom_message_callback: Callable = None,
):
"""
Construct standard payload using Slack's Block Kit.
Includes parameter and exception information when relevant.
Use optional parameter `custom_message_callback` to add a custom message to the end of the payload.
For reference details, see: <https://api.slack.com/block-kit>
Args:
tracked_obj: (prefect.Flow, prefect.Task) The object in question, which will be
inspected to produce an appropriate message.
state: (prefect.engine.state.State) The state that the tracked object has
transitioned to. State details are used to enrich the message payload.
"""
project_name = prefect.context.get("project_name", "< Project Name Not Found >")
flow_name = prefect.context.get("flow_name", "< Flow Name Not Found >")
url = get_cloud_url(tracked_obj)
blocks = [
{"type": "divider"},
{
"type": "section",
"fields": [
{"type": "mrkdwn", "text": "*Status*"},
{"type": "mrkdwn", "text": "*Project*"},
{"type": "mrkdwn", "text": f"*<{url}|{type(state).__name__}>*",},
{"type": "plain_text", "text": project_name},
{"type": "mrkdwn", "text": "*Flow*"},
{"type": "mrkdwn", "text": "*Message*"},
{"type": "plain_text", "text": flow_name},
{"type": "plain_text", "text": getattr(state, "message", "< None >")},
],
},
]
if parameter_block := construct_parameter_block():
blocks.append(parameter_block)
if state.is_failed():
exception_blocks = construct_exception_blocks(tracked_obj, state)
blocks.extend(exception_blocks)
if custom_message_callback:
blocks.append(custom_message_callback())
return {"blocks": blocks}
def make_slack_notification(
webhook_parameter_name: str, custom_message_callback: Callable = None
):
"""
Returns a callable which can be supplied to the callback_factory.
This function can be incorporated into state handlers for tasks or flows.
"""
def _slack_notification(
obj: TrackedObjectType, new_state: "prefect.engine.state.State"
):
"""
State handler function. Posts useful information to Slack, including:
* Status, Project, Flow, Parameter(s), Error Message(s)
"""
slack_webhook_url = get_slack_webhook_url(webhook_parameter_name)
payload = construct_payload(obj, new_state, custom_message_callback)
resp = <http://requests.post|requests.post>(slack_webhook_url, json=payload)
if resp.status_code != 200:
# If original message was malformed or there was some other issue,
# try to ensure that _something_ is posted.
<http://requests.post|requests.post>(
slack_webhook_url,
json={
"text": f"{obj} is in state {new_state}\nFull response is unavailable at this time, please review logs."
},
)
return _slack_notification
def state_handler_factory(
webhook_parameter_name: str = None,
check: Callable = None,
custom_message_callback: Callable = None,
):
"""
Use this function to build a custom state handler for prefect Flows or Tasks.
Args:
- webhook_parameter_name (str): an SSM parameter that should resolve to a Slack Webhook URL
- check (Callable): a function with signature `check(state: State) -> bool`
that is used for determining when the callback function should be called
- custom_message_callback (Callable): a function that will be called during execution
of the state handler, and will return a Slack Block Kit block.
"""
if check is None:
check = lambda state: state.is_failed()
return callback_factory(
fn=make_slack_notification(
webhook_parameter_name=webhook_parameter_name,
custom_message_callback=custom_message_callback,
),
check=check,
)
construct_exception_blocks
function may be the way to go but we'll see if it works.Anna Geller
Billy McMonagle
02/18/2022, 1:53 AMKevin Kho
isinstance(state, Failed)
and this will resolve to True so you can subclass the Failed state and embed an object in there too for later retrieval. I’m positive at least.Billy McMonagle
02/18/2022, 2:40 AMException
object from a task state handler, while I only see the string representation of the Exception in a flow state handler.Kevin Kho
Billy McMonagle
02/18/2022, 2:44 AMfrom prefect import Flow, task
class MySpecialException(Exception):
def __init__(self, message, detail):
self.message = message
self.detail = detail
super().__init__(message)
class ImportantThing:
def __init__(self):
self.failed = []
def run(self, inputs):
for input in inputs:
try:
input / 0
except:
self.failed.append(input)
def state_handler(obj, old, new):
if new.is_failed():
print("; ".join([str(x) for x in new.result.detail]))
@task(state_handlers=[state_handler])
def do_the_thing():
thing = ImportantThing()
thing.run(inputs=[1, 2, 3, 4, 5])
if thing.failed:
raise MySpecialException("Some failures", detail=thing.failed)
with Flow("Thing Do") as flow:
do_the_thing()
res = flow.run()
Kevin Kho
Anna Geller