Thread
#prefect-community
    Jason Motley

    Jason Motley

    9 months ago
    Is there a way to feed a web hook into a specific google sheet and cell? I.e. I'm tracking all pipelines in a G Sheet, I'd like Cell C3 to say success/failure and a timestamp, C4 to say it for a different flow, etc.
    Kevin Kho

    Kevin Kho

    9 months ago
    Is this meant to be just for your tracking purposes? Because you can use the GraphQL API to surface this information
    Short answer is no though
    Jason Motley

    Jason Motley

    9 months ago
    yeah, slack hook is getting a little crowded haha
    Kevin Kho

    Kevin Kho

    9 months ago
    Ah I see. In this case, some people have a “reporting flow” that will run (let’s say daily), get all the flows of that set, and then send a Slack message so that it’s aggregated
    Jason Motley

    Jason Motley

    9 months ago
    oh I like that
    Could I Build that into the "flow of flows" we had discussed?
    Kevin Kho

    Kevin Kho

    9 months ago
    I think so. If you run A, B, and C, you want reporting for those three?
    Jason Motley

    Jason Motley

    9 months ago
    Yeah so say I have 10 flows that are all dependent, I want them running in order, which the flow of flows does. I also want a system that says Report A, refresh success, 2AM, Report B, refresh success, 215AM, etc. The idea is that the G Sheet can be seen by many users and has summaries on the reports, when they run, etc.
    Kevin Kho

    Kevin Kho

    9 months ago
    So
    wait_for_flow_run
    has a
    raise_final_state
    . Set this to True. This means that if the sub-Flow Fails, the
    wait
    will Fail also.
    So this means you can do something like:
    with Flow(...) as flow:
         a = create_flow_run(...)
         wait_a = wait_for_flow_run(a, raise_final_state=True)
         b = create_flow_run(...)
         wait_b = wait_for_flow_run(b, raise_final_state=True)
    
         collect_all_subflows = [wait_a, wait_b, ...]
         print_to_slack(collect_all_subflows)
    print_to_slack can use the SlackTask which lets you construct a message. And then you can just call the
    run
    method inside.
    There is one caveat here though. If a task Fails, downstream tasks will not run because the dependency failed. You may need to use the
    always_run
    trigger
    Jason Motley

    Jason Motley

    9 months ago
    Just what I need, thank you. As for the
    always_run
    , that would be within create_flow_run, right?
    Kevin Kho

    Kevin Kho

    9 months ago
    I nearly have an example done.
    Took longer than expected lol. I think this is a good base:
    from prefect import task, Flow
    import prefect
    from prefect.tasks.prefect import create_flow_run, wait_for_flow_run
    from prefect.triggers import always_run
    from prefect.tasks.notifications import SlackTask
    
    @task
    def get_values(x):
        if x == 2:
            raise ValueError()
        return x+1
    
    @task(trigger=always_run)
    def notifier(wait_a, wait_b, wait_c):
        # <http://prefect.context.logger.info|prefect.context.logger.info>(wait_a)
        flows = [wait_a, wait_b, wait_c]
        msg = ""
        for flow in flows:
            msg = msg + f"{flow.name} had a state {flow.state} \n"
        <http://prefect.context.logger.info|prefect.context.logger.info>(msg)
        SlackTask(message=msg).run()
        return msg
    
    with Flow("flowA") as flow_a:
        get_values(1)
    
    with Flow("flowB") as flow_b:
        get_values(2)
    
    with Flow("flowC") as flow_c:
        get_values(1)
    
    create_flow_run.trigger = always_run
    wait_for_flow_run.trigger = always_run
    
    with Flow("main") as flow:
        a = create_flow_run(flow_name="flowA", project_name="bristech")
        wait_a = wait_for_flow_run(a, raise_final_state=True)
        b = create_flow_run(flow_name="flowB", project_name="bristech", upstream_tasks=[wait_a])
        wait_b = wait_for_flow_run(b, raise_final_state=True)
        c = create_flow_run(flow_name="flowC", project_name="bristech", upstream_tasks=[wait_b])
        wait_c = wait_for_flow_run(c, raise_final_state=True)
    
        notifier(wait_a, wait_b, wait_c)
    
    flow_a.register("bristech")
    flow_b.register("bristech")
    flow_c.register("bristech")
    # flow.register("bristech")
    flow.run()
    Jason Motley

    Jason Motley

    9 months ago
    hahhaha this is exactly what I need though! Thank you!
    Kevin Kho

    Kevin Kho

    9 months ago
    Of course! Inside
    notifier
    , I think you can check if something failed by doing
    if wait_a.state.is_failed():
        # add some different message