Sylvain Hazard
10/18/2021, 11:56 AMAnna Geller
10/18/2021, 12:13 PMSylvain Hazard
10/18/2021, 12:19 PMAnna Geller
10/18/2021, 12:22 PMimport prefect
from prefect import task, Flow
from prefect.tasks.notifications import SlackTask
from prefect.client import Client
slack_task = SlackTask()
@task(log_stdout=True)
def hello_world():
print("hello world")
def summary_state_handler(obj, old_state, new_state):
if old_state.is_running() and new_state.is_finished():
client = Client()
flow_id = prefect.context.get("flow_id") # you may need to retrieve it from a separate task
query = """
query {
flow_run(where: { id: {_eq: \"""" + flow_id + """\"} }) {
id
task_runs {
id
name
serialized_state
}
}
}
"""
results = client.graphql(query)
print(results)
slack_task.run(message="YOUR MESSAGE")
with Flow("summary-state-handler", state_handlers=[summary_state_handler]) as flow:
hello_task = hello_world()
Sylvain Hazard
10/18/2021, 12:23 PMZanie
10/18/2021, 3:13 PMfrom prefect.backend import FlowRunView
flow_run = FlowRunView.from_flow_run_id(prefect.context["flow_run_id"])
Sylvain Hazard
10/18/2021, 3:31 PM