Got another quick question : what would be the bes...
# prefect-server
s
Got another quick question : what would be the best "prefectonic" way to access every other task from a given task ? For context, I'd like to make a task that recaps everything that happened in the others tasks and sends a Slack message with that content e.g. "Flow ran in X minutes, Y tasks failed, etc."
a
One possible solution that would work would be a Flow-level state handler that sends a message to Slack as soon as the Flow’s state is changing from Running to Finished. Then, to get more information to populate the slack message in the state handler, you would likely have to use GraphQL client to get more information about the flow run and the corresponding task runs.
s
That makes a lot of sense ! Time to dig into GraphQL a little more then. Thanks a lot !
a
You can use this to get started:
Copy code
import prefect
from prefect import task, Flow
from prefect.tasks.notifications import SlackTask
from prefect.client import Client


slack_task = SlackTask()


@task(log_stdout=True)
def hello_world():
    print("hello world")


def summary_state_handler(obj, old_state, new_state):
    if old_state.is_running() and new_state.is_finished():
        client = Client()
        flow_id = prefect.context.get("flow_id")  # you may need to retrieve it from a separate task
        query = """
                query { 
                    flow_run(where: { id: {_eq: \"""" + flow_id + """\"} })  {
                        id
                        task_runs {
                            id
                            name
                            serialized_state
                        }
                    }
                }
            """
        results = client.graphql(query)
        print(results)
        slack_task.run(message="YOUR MESSAGE")


with Flow("summary-state-handler", state_handlers=[summary_state_handler]) as flow:
    hello_task = hello_world()
s
Wow thanks so much, looks like a great starting point !
👍 1
z
You can also use
Copy code
from prefect.backend import FlowRunView

flow_run = FlowRunView.from_flow_run_id(prefect.context["flow_run_id"])
upvote 1
This object can query for information about flow runs / task runs / flows without writing any graphql
s
Oh, that sounds way easier ! I'll check that out, thanks !