https://prefect.io logo
Title
k

karteekaddanki

07/14/2020, 3:23 PM
Hey guys, Is it possible to trigger one flow to run another? If so, is there a prefect recommended way of doing it? I have a workflow where there are some heavyweight tasks that do not change much with time (related to parsing and data normalization). Tasks downstream from these are driven by research and regularly require re-running. As Flow seems to be atomic when it comes to scheduling in prefect, what is the best way for me to structure my workflow in prefect? Thanks in advance.
j

Jim Crist-Harif

07/14/2020, 3:27 PM
Hi @karteekaddanki, if I understand correctly, your root goal is to avoid re-running these expensive tasks? If so, prefect has a few mechanisms for caching tasks to avoid rerunning - you can then structure your whole workflow as a single flow and let prefect determine when a task needs to be rerun.
👍 1
m

Mark McDonald

07/14/2020, 3:27 PM
I do it by having a task that makes a graphql call from the client
@task(name="Trigger downstream flows", trigger=all_successful)
def trigger_downstream_flows():
    """
    Make a GraphQL call to trigger downstream flows.
    :return: None
    """
    c = Client()
    c.graphql("""
           mutation {
                create_flow_run(input: { version_group_id: "1234-abcd-1234-abcd" }) {
                    id
                }
            }
        """)
j

Jim Crist-Harif

07/14/2020, 3:28 PM
k

karteekaddanki

07/14/2020, 3:31 PM
@Jim Crist-Harif This sounds like what I should be using although my intuition was to use something similar to what @Mark McDonald suggested. I'd need to figure out a systematic way of clearing the cached results (I haven't read the link in detail yet) for the case when the parsing does goes wrong.
Yeah, we kind of leave it up to you to determine how to structure your flows/tasks. You might also use a separate flow, and use the
FlowRunTask
(https://docs.prefect.io/api/latest/tasks/prefect.html#flowruntask) to kick off a flow run if it needs to run. The caching mechanism can be useful here though, and would be the first thing I'd recommend if it works.
:upvote: 3
k

karteekaddanki

07/17/2020, 12:42 PM
Is
FlowRunTask
expected to update the runs of the child flow in prefect cloud? I'm not seeing it. I'm trying to implement backfill functionality using
FlowRunTask
via a generic wrapper task.
j

Jim Crist-Harif

07/17/2020, 1:58 PM
FlowRunTask
should kick off a new flow run in prefect cloud. However, depending on your cloud account you might have limits on the number of active flow runs, so you might not see that flow run start for a bit.