Jean-Baptiste Six
11/22/2021, 2:05 PMslack_notifier
, without success.
⢠Then I tried with SlackTask
, this is my piece of code (didn't work) :
def post_to_slack_on_failure(task, old_state, new_state):
if new_state.is_failed():
if isinstance(new_state.result, Exception):
value = "```{}```".format(repr(new_state.result))
else:
value = str(new_state.message)
msg = (
f"The task `{prefect.context.task_name}` failed "
f"in a flow run {prefect.context.flow_run_id} "
f"with an exception {value}"
)
SlackTask(
message=msg,
webhook_secret="<https://hooks.slack.com/services/*******/*******/*******>"
).run()
return new_state
⢠Finnally, I tried with a custom solution (didn't work, again) :
def post_to_slack_on_failure(task, old_state, new_state):
if new_state.is_finished():
msg = "Task {0} finished in state {1}".format(task, new_state)
# replace with your Slack webhook URL secret name
secret_slack = cast(str, Secret("<https://hooks.slack.com/services/*******/*******/*******>").get())
<http://requests.post|requests.post>(secret_slack, json={"text": msg})
return new_state
This is my test flow :
@task
def task_error():
raise Exception("Test")
with Flow("Test Slack", state_handlers=[post_to_slack_on_failure]) as flow:
task_error()
flow.run()
(I also tried EmailTask with smtp_type="INSECURE", and once again it didn't work)
Need some help please šAnna Geller
webhook_secret
expects a reference to your Prefect Secret. By default, it looks for a secret named SLACK_WEBHOOK_URL.Anna Geller
def post_to_slack_on_failure(task, old_state, new_state):
if new_state.is_finished():
msg = "Task {0} finished in state {1}".format(task, new_state)
<http://requests.post|requests.post>("<https://hooks.slack.com/services/*******/*******/*******>", json={"text": msg})
return new_state
Jean-Baptiste Six
11/22/2021, 2:26 PM<Response [404]>
Jean-Baptiste Six
11/22/2021, 2:31 PMno_service
Anna Geller
import prefect
from prefect import task, Flow
from prefect.tasks.notifications import SlackTask
from prefect.engine import signals
def post_to_slack_on_failure(task, old_state, new_state):
if new_state.is_failed():
msg = (
f"The task `{prefect.context.task_name}` failed "
f"in a flow run {prefect.context.flow_run_id} "
f"with a message `{new_state.result}`"
)
SlackTask(message=msg).run()
return new_state
@task(state_handlers=[post_to_slack_on_failure])
def mission_critical_task(a, b):
if a == b:
raise signals.FAIL("a equaled b!")
else:
return 1 / (b - a)
with Flow(name="state-inspection-handler") as flow:
result = mission_critical_task(1, 1)
I think the requests syntax is not entirely correct, it should be:
<http://requests.post|requests.post>(url=webhook_url, data=json.dumps({"text": "your Slack msg"}))
Jean-Baptiste Six
11/22/2021, 2:39 PMJean-Baptiste Six
11/22/2021, 2:39 PMJean-Baptiste Six
11/22/2021, 2:40 PMKevin Kho