Domenico Di Gangi
05/29/2022, 10:48 AMAnna Geller
05/29/2022, 10:56 AMprefect-slack
collection, I assume you rather ask for Prefect 2.0?Domenico Di Gangi
05/29/2022, 10:57 AMAnna Geller
05/29/2022, 11:02 AMfrom prefect import task, flow
from prefect.futures import PrefectFuture
from prefect_slack import SlackWebhook
from prefect_slack.messages import send_incoming_webhook_message
def send_slack_alert_on_failure(task_future: PrefectFuture):
task_future.wait() # block until completion
if task_future.get_state().is_failed():
name_ = task_future.task_run.name
id_ = task_future.task_run.flow_run_id
send_incoming_webhook_message(
slack_webhook=SlackWebhook("<https://hooks.slack.com/services/xx/xx/xxx>"),
text=f"The task `{name_}` failed in a flow run `{id_}`",
)
@task
def fail_successfully(x):
return 1 / x
@flow
def main_flow(nr: int):
future_obj = fail_successfully(nr)
send_slack_alert_on_failure(future_obj)
if __name__ == "__main__":
main_flow(nr=0)
Domenico Di Gangi
05/29/2022, 11:19 AM