Andres
04/04/2022, 11:54 AMSome reference tasks failed.
I was investigating a bit and seem that the state contains the result of all tasks while running it locally (state.result
) while on the server this is empty (i printed it using the logger) .
Any idea on how to address this?Anna Geller
04/04/2022, 12:27 PMimport json
import requests
import prefect
from prefect import task, Flow
from prefect.backend.flow_run import FlowRunView
def send_report_on_success(task, old_state, new_state):
if new_state.is_successful():
flow_run = FlowRunView.from_flow_run_id(prefect.context.get("flow_run_id"))
task_run = flow_run.get_task_run(task_run_id=prefect.context.get("task_run_id"))
result = task_run.get_result()
<http://requests.post|requests.post>(url="webhook_url", data=json.dumps({"text": result}))
return new_state
@task(state_handlers=[send_report_on_success])
def return_some_data():
return "Some result"
with Flow(name="state-handler-demo-flow") as flow:
result = return_some_data()
the easiest solution on Prefect Server would be using Cloud HooksAndres
04/04/2022, 12:34 PMSome reference tasks failed.
I get not context of the failure at all....Anna Geller
04/04/2022, 1:38 PMimport prefect
from prefect import task, Flow
from prefect.tasks.notifications import SlackTask
from typing import cast
def post_to_slack_on_failure(task, old_state, new_state):
if new_state.is_failed():
if isinstance(new_state.result, Exception):
value = "```{}```".format(repr(new_state.result))
else:
value = cast(str, new_state.message)
msg = (
f"The task `{prefect.context.task_name}` failed "
f"in a flow run {prefect.context.flow_run_id} "
f"with an exception {value}"
)
SlackTask(message=msg).run()
return new_state
@task(state_handlers=[post_to_slack_on_failure])
def divide_numbers(a, b):
return 1 / (b - a)
with Flow(name="state-inspection-handler") as flow:
result = divide_numbers(1, 1)
if __name__ == "__main__":
flow.run()
but you would need to attach this state handler to all tasks that need this type of notification, so not on a flow-levelKevin Kho
04/04/2022, 1:46 PMAndres
04/04/2022, 2:10 PMAnna Geller
04/04/2022, 6:51 PMAndres
04/05/2022, 12:51 PMtasks = FlowRunView.from_flow_run_id(prefect.context.get("flow_run_id")).get_all_task_runs()
failed_tasks = [task for task in tasks if isinstance(task.state, Failed)]
Now on failed_tasks I have the failed tasks with their information, so it's just matter to play around and display the message you want 🙂Anna Geller
04/05/2022, 2:09 PM