k

    karteekaddanki

    2 years ago
    Hey guys, Is it possible to trigger one flow to run another? If so, is there a prefect recommended way of doing it? I have a workflow where there are some heavyweight tasks that do not change much with time (related to parsing and data normalization). Tasks downstream from these are driven by research and regularly require re-running. As Flow seems to be atomic when it comes to scheduling in prefect, what is the best way for me to structure my workflow in prefect? Thanks in advance.
    Jim Crist-Harif

    Jim Crist-Harif

    2 years ago
    Hi @karteekaddanki, if I understand correctly, your root goal is to avoid re-running these expensive tasks? If so, prefect has a few mechanisms for caching tasks to avoid rerunning - you can then structure your whole workflow as a single flow and let prefect determine when a task needs to be rerun.
    Mark McDonald

    Mark McDonald

    2 years ago
    I do it by having a task that makes a graphql call from the client
    @task(name="Trigger downstream flows", trigger=all_successful)
    def trigger_downstream_flows():
        """
        Make a GraphQL call to trigger downstream flows.
        :return: None
        """
        c = Client()
        c.graphql("""
               mutation {
                    create_flow_run(input: { version_group_id: "1234-abcd-1234-abcd" }) {
                        id
                    }
                }
            """)
    Jim Crist-Harif

    Jim Crist-Harif

    2 years ago
    k

    karteekaddanki

    2 years ago
    @Jim Crist-Harif This sounds like what I should be using although my intuition was to use something similar to what @Mark McDonald suggested. I'd need to figure out a systematic way of clearing the cached results (I haven't read the link in detail yet) for the case when the parsing does goes wrong.
    Jim Crist-Harif

    Jim Crist-Harif

    2 years ago
    Yeah, we kind of leave it up to you to determine how to structure your flows/tasks. You might also use a separate flow, and use the
    FlowRunTask
    (https://docs.prefect.io/api/latest/tasks/prefect.html#flowruntask) to kick off a flow run if it needs to run. The caching mechanism can be useful here though, and would be the first thing I'd recommend if it works.
    k

    karteekaddanki

    2 years ago
    Is
    FlowRunTask
    expected to update the runs of the child flow in prefect cloud? I'm not seeing it. I'm trying to implement backfill functionality using
    FlowRunTask
    via a generic wrapper task.
    Jim Crist-Harif

    Jim Crist-Harif

    2 years ago
    FlowRunTask
    should kick off a new flow run in prefect cloud. However, depending on your cloud account you might have limits on the number of active flow runs, so you might not see that flow run start for a bit.