Ken Nguyen
09/24/2021, 5:42 PMdef post_to_slack(obj, old_state, new_state):
# Notify failures
if new_state.is_failed():
msg = "Flow/Task {0} finished in state {1}".format(obj, new_state)
secret_slack = fail_channel_webhook
<http://requests.post|requests.post>(secret_slack, json={"text": msg})
# Notify runs
if new_state.is_running():
msg = "Flow/Task {0} is running".format(obj)
secret_slack = running_channel_webhook
<http://requests.post|requests.post>(secret_slack, json={"text": msg})
return new_state
When I tested it on tasks, I was able to receive notifications for task running, and then task failing. However, when I tested it on flows, it only notified me of flow running, but not failing despite the flow's end result being Flow run FAILED: some reference tasks failed.
What differs between the flow and the task for this to not have worked? Thanks in advance for your suggestions!Kevin Kho
Ken Nguyen
09/24/2021, 5:49 PMKen Nguyen
09/24/2021, 5:50 PMKevin Kho
from prefect import Flow, task
import prefect
def post_to_slack(obj, old_state, new_state):
# Notify failures
logger = prefect.context.get("logger")
if new_state.is_failed():
<http://logger.info|logger.info>("This failed")
# Notify runs
if new_state.is_running():
<http://logger.info|logger.info>("This is running")
return new_state
@task(state_handlers = [post_to_slack])
def abc():
raise ValueError()
return 1
with Flow("test", state_handlers=[post_to_slack]) as flow:
abc()
flow.run()
Kevin Kho
Kevin Kho
Ken Nguyen
09/24/2021, 7:35 PMKen Nguyen
09/24/2021, 7:36 PMKevin Kho
prefect.context.get("flow_run_id")
Kevin Kho
Ken Nguyen
09/24/2021, 8:44 PMKevin Kho
Kevin Kho
run
method.