Arnon Kimhi
12/02/2021, 1:27 PMif new_state.is_successful():
flow_run_id = prefect.context.flow_run_id
flow_run_view = FlowRunView.from_flow_run_id(flow_run_id)
b_task_run = flow_run_view.get_task_run("b-1")
Anna Geller
import prefect
from prefect import task, Flow
from prefect.tasks.notifications import SlackTask
from typing import cast
def post_to_slack_on_failure(task, old_state, new_state):
if new_state.is_failed():
if isinstance(new_state.result, Exception):
value = "```{}```".format(repr(new_state.result))
else:
value = cast(str, new_state.message)
msg = (
f"The task `{prefect.context.task_name}` failed "
f"in a flow run {prefect.context.flow_run_id} "
f"with an exception {value}"
)
SlackTask(message=msg).run()
return new_state
@task(state_handlers=[post_to_slack_on_failure])
def divide_numbers(a, b):
return 1 / (b - a)
with Flow(name="state-inspection-handler") as flow:
result = divide_numbers(1, 1)
if __name__ == "__main__":
flow.run()
Arnon Kimhi
12/02/2021, 1:33 PMAnna Geller
def post_to_slack_on_failure(task, old_state, new_state):
and here:
@task(state_handlers=[post_to_slack_on_failure])
Arnon Kimhi
12/02/2021, 1:51 PMnew_state.result
in the case of flow handler?Arnon Kimhi
12/02/2021, 1:51 PMArnon Kimhi
12/02/2021, 1:51 PMArnon Kimhi
12/02/2021, 1:51 PMAnna Geller
@task
def get_task_run_result(
flow_run_id: str, task_slug: str, map_index: int = -1, poll_time: int = 5
)
Can you describe the problem you try to solve? Perhaps passing data from one task to another task and performing some checks this way would be easier to set up than a state handler?
from prefect import task, Flow
@task
def return_some_data():
return "some result"
@task
def check_result_from_another_task(x):
if x == "some result":
# do sth
else:
# do sth else
with Flow("checking-results") as flow:
data = return_some_data()
check_result_from_another_task(data)
Arnon Kimhi
12/02/2021, 2:15 PMAnna Geller