https://prefect.io logo
Title
s

Sylvain Hazard

10/18/2021, 11:56 AM
Got another quick question : what would be the best "prefectonic" way to access every other task from a given task ? For context, I'd like to make a task that recaps everything that happened in the others tasks and sends a Slack message with that content e.g. "Flow ran in X minutes, Y tasks failed, etc."
a

Anna Geller

10/18/2021, 12:13 PM
One possible solution that would work would be a Flow-level state handler that sends a message to Slack as soon as the Flow’s state is changing from Running to Finished. Then, to get more information to populate the slack message in the state handler, you would likely have to use GraphQL client to get more information about the flow run and the corresponding task runs.
s

Sylvain Hazard

10/18/2021, 12:19 PM
That makes a lot of sense ! Time to dig into GraphQL a little more then. Thanks a lot !
a

Anna Geller

10/18/2021, 12:22 PM
You can use this to get started:
import prefect
from prefect import task, Flow
from prefect.tasks.notifications import SlackTask
from prefect.client import Client


slack_task = SlackTask()


@task(log_stdout=True)
def hello_world():
    print("hello world")


def summary_state_handler(obj, old_state, new_state):
    if old_state.is_running() and new_state.is_finished():
        client = Client()
        flow_id = prefect.context.get("flow_id")  # you may need to retrieve it from a separate task
        query = """
                query { 
                    flow_run(where: { id: {_eq: \"""" + flow_id + """\"} })  {
                        id
                        task_runs {
                            id
                            name
                            serialized_state
                        }
                    }
                }
            """
        results = client.graphql(query)
        print(results)
        slack_task.run(message="YOUR MESSAGE")


with Flow("summary-state-handler", state_handlers=[summary_state_handler]) as flow:
    hello_task = hello_world()
s

Sylvain Hazard

10/18/2021, 12:23 PM
Wow thanks so much, looks like a great starting point !
👍 1
z

Zanie

10/18/2021, 3:13 PM
You can also use
from prefect.backend import FlowRunView

flow_run = FlowRunView.from_flow_run_id(prefect.context["flow_run_id"])
:upvote: 1
This object can query for information about flow runs / task runs / flows without writing any graphql
s

Sylvain Hazard

10/18/2021, 3:31 PM
Oh, that sounds way easier ! I'll check that out, thanks !