Is there a way to feed a web hook into a specific ...
# ask-community
j
Is there a way to feed a web hook into a specific google sheet and cell? I.e. I'm tracking all pipelines in a G Sheet, I'd like Cell C3 to say success/failure and a timestamp, C4 to say it for a different flow, etc.
k
Is this meant to be just for your tracking purposes? Because you can use the GraphQL API to surface this information
Short answer is no though
j
yeah, slack hook is getting a little crowded haha
k
Ah I see. In this case, some people have a “reporting flow” that will run (let’s say daily), get all the flows of that set, and then send a Slack message so that it’s aggregated
j
oh I like that
Could I Build that into the "flow of flows" we had discussed?
k
I think so. If you run A, B, and C, you want reporting for those three?
j
Yeah so say I have 10 flows that are all dependent, I want them running in order, which the flow of flows does. I also want a system that says Report A, refresh success, 2AM, Report B, refresh success, 215AM, etc. The idea is that the G Sheet can be seen by many users and has summaries on the reports, when they run, etc.
k
So
wait_for_flow_run
has a
raise_final_state
. Set this to True. This means that if the sub-Flow Fails, the
wait
will Fail also.
So this means you can do something like:
Copy code
with Flow(...) as flow:
     a = create_flow_run(...)
     wait_a = wait_for_flow_run(a, raise_final_state=True)
     b = create_flow_run(...)
     wait_b = wait_for_flow_run(b, raise_final_state=True)

     collect_all_subflows = [wait_a, wait_b, ...]
     print_to_slack(collect_all_subflows)
print_to_slack can use the SlackTask which lets you construct a message. And then you can just call the
run
method inside.
There is one caveat here though. If a task Fails, downstream tasks will not run because the dependency failed. You may need to use the
always_run
trigger
j
Just what I need, thank you. As for the
always_run
, that would be within create_flow_run, right?
k
I nearly have an example done.
Took longer than expected lol. I think this is a good base:
Copy code
from prefect import task, Flow
import prefect
from prefect.tasks.prefect import create_flow_run, wait_for_flow_run
from prefect.triggers import always_run
from prefect.tasks.notifications import SlackTask

@task
def get_values(x):
    if x == 2:
        raise ValueError()
    return x+1

@task(trigger=always_run)
def notifier(wait_a, wait_b, wait_c):
    # <http://prefect.context.logger.info|prefect.context.logger.info>(wait_a)
    flows = [wait_a, wait_b, wait_c]
    msg = ""
    for flow in flows:
        msg = msg + f"{flow.name} had a state {flow.state} \n"
    <http://prefect.context.logger.info|prefect.context.logger.info>(msg)
    SlackTask(message=msg).run()
    return msg

with Flow("flowA") as flow_a:
    get_values(1)

with Flow("flowB") as flow_b:
    get_values(2)

with Flow("flowC") as flow_c:
    get_values(1)

create_flow_run.trigger = always_run
wait_for_flow_run.trigger = always_run

with Flow("main") as flow:
    a = create_flow_run(flow_name="flowA", project_name="bristech")
    wait_a = wait_for_flow_run(a, raise_final_state=True)
    b = create_flow_run(flow_name="flowB", project_name="bristech", upstream_tasks=[wait_a])
    wait_b = wait_for_flow_run(b, raise_final_state=True)
    c = create_flow_run(flow_name="flowC", project_name="bristech", upstream_tasks=[wait_b])
    wait_c = wait_for_flow_run(c, raise_final_state=True)

    notifier(wait_a, wait_b, wait_c)

flow_a.register("bristech")
flow_b.register("bristech")
flow_c.register("bristech")
# flow.register("bristech")
flow.run()
j
hahhaha this is exactly what I need though! Thank you!
k
Of course! Inside
notifier
, I think you can check if something failed by doing
Copy code
if wait_a.state.is_failed():
    # add some different message