Billy McMonagle
12/20/2021, 4:51 PMBilly McMonagle
12/20/2021, 4:51 PMBilly McMonagle
12/20/2021, 4:52 PMHTTPSConnectionPool(host='<http://api.prefect.io|api.prefect.io>', port=443): Max retries exceeded with url: / (Caused by ReadTimeoutError("HTTPSConnectionPool(host='<http://api.prefect.io|api.prefect.io>', port=443): Read timed out. (read timeout=15)"))
In addition to the failures, several flows hung around for about 2 hours (you can see the tall bars). Some completed successfully after that time, and one failed.Billy McMonagle
12/20/2021, 4:54 PMBilly McMonagle
12/20/2021, 4:58 PMZanie
Zanie
Billy McMonagle
12/20/2021, 5:04 PMBilly McMonagle
12/20/2021, 5:04 PMdef make_slack_failure_notification(slack_webhook_function: Callable):
"""
Returns a callable which can be supplied to the callback_factory.
"""
def _slack_failure_notification(obj, new_state):
"""
State handler that posts to Slack on flow or task failure.
"""
msg = _construct_failure_message(obj)
SlackTask().run(message=msg, webhook_url=slack_webhook_function())
return _slack_failure_notification
def _construct_failure_message(obj: Union[Flow, Task]):
"""
Given Flow/Task object, get contextual info and return failure message
"""
project_name = prefect.context.get("project_name")
flow_name = prefect.context.get("flow_name")
if isinstance(obj, Flow):
run_type = "flow"
run_id = prefect.context.get("flow_run_id")
run_url = f"<https://cloud.prefect.io/flow-run/{run_id}>"
task_name = ""
elif isinstance(obj, Task):
run_type = "task"
run_id = prefect.context.get("task_run_id")
run_url = f"<https://cloud.prefect.io/task-run/{run_id}>"
task_name = f".{obj.name}"
msg = (
f"Error in {run_type} {project_name}.{flow_name}{task_name}\n"
f"For more, see: {run_url}"
)
return msg
def make_failure_callback(slack_webhook_function: Callable):
"""
Return a callback function which can be passed as a state handler
to a Prefect Flow or Task.
Failure messages will be posted to the provided webhook url.
"""
return callback_factory(
fn=make_slack_failure_notification(slack_webhook_function),
check=lambda state: state.is_failed(),
)
Zanie
Yusuf Khan
12/21/2021, 2:15 AMBilly McMonagle
12/21/2021, 10:34 PMZanie