Zach Schumacher
06/03/2021, 3:40 PMZach Schumacher
06/03/2021, 3:40 PMnicholas
Zach Schumacher
06/03/2021, 4:10 PMfrom typing import Set, Optional
from prefect import task, Flow
from prefect.engine.state import State
from prefect.utilities.logging import get_logger
logger = get_logger("example-terminal-state-behavior")
def custom_terminal_state_handler(
flow: Flow,
state: State,
reference_task_states: Set[State],
) -> Optional[State]:
if state.is_failed():
for task, task_state in state.result.items():
<http://logger.info|logger.info>(f"task.name={task.name} state={task_state}")
return state
@task()
def abc(x):
return x
@task()
def bcd(x):
raise ValueError("Foo!")
@task()
def cde(x):
return x
with Flow(
"example-terminal-state-behavior", terminal_state_handler=custom_terminal_state_handler
) as flow:
a = abc(1)
b = bcd(a)
c = cde(b)
flow.run()
Zach Schumacher
06/03/2021, 4:11 PMZach Schumacher
06/03/2021, 4:11 PMZach Schumacher
06/03/2021, 4:13 PMZach Schumacher
06/03/2021, 4:13 PMnicholas
Zach Schumacher
06/03/2021, 4:31 PMnicholas
Zach Schumacher
06/03/2021, 5:18 PMnicholas
from prefect import Client
@task
class SlackNotifier(SlackTask):
def run(self, webhook_secret):
id = prefect.context.get("flow_run_id")
client = Client()
query = f"""
query { flow_run_by_pk(id: {id}) { id, state, task_run { id, state } }
"""
res = client.graphql(query)
# parse response for flow run state and task run states
self.message = "hello"
self.webhook_secret = webhook_secret
super(SlackNotifier, self).run()
Zach Schumacher
06/03/2021, 5:27 PMnicholas
Zach Schumacher
06/03/2021, 5:29 PMnicholas
Zach Schumacher
06/03/2021, 5:32 PMnicholas
Zach Schumacher
06/08/2021, 5:24 PMZach Schumacher
06/08/2021, 5:25 PMdef get_slack_state_handler(
webhook_url: str, message_factory: MessageFactory = _get_slack_message
) -> StateHandlerType: # pragma: no cover
"""
Inject in the slack webhook url (to make this un-opinionated on where it comes from) and also allow overriding
the how the slack message is created (via message_factory).
The default behavior is a slack message with one attachment for each prefect task.
"""
def slack_state_handler(flow: Flow, old_state: State, new_state: State):
"""
We have to fetch task status from the graph API when running in cloud (see issue referenced above). This
code can go away when terminal state handlers work correctly in cloud/k8s
"""
# this only works when running in prefect cloud
if new_state.is_finished() and os.getenv("ENVIRONMENT") in {"dev", "staging", "prod"}:
from prefect import context
flow_run_id = context.get("flow_run_id")
flow_run = get_flow_run(flow_run_id)
slack_message = message_factory(flow_run)
<http://sbrequests.post|sbrequests.post>(webhook_url, json=slack_message)
return slack_state_handler
Zach Schumacher
06/08/2021, 5:26 PMdef get_flow_run(flow_run_id: str) -> PrefectFlowRun:
client = Client()
response = client.graphql(
f"""
query {{
flow_run_by_pk ( id: "{flow_run_id}" ),
{{
id,
flow {{
id,
name,
version
}}
state,
start_time,
end_time,
state_message
task_runs {{
id,
state,
state_message,
start_time,
end_time,
task {{
id,
name,
slug,
type
}}
}}
}}
}}"""
)
return PrefectFlowRun.from_orm(response.data.flow_run_by_pk)
Zach Schumacher
06/08/2021, 5:35 PMKevin Kho