how do i wait for completion of a subflow before r...
# best-practices
a
how do i wait for completion of a subflow before running another subflow?
n
hi @Andrew Lawlor - this is how it works by default, so if you have the following:
Copy code
from prefect import flow

@flow
def bar():
   print("first!")

@flow
def baz():
   print("second!")

@flow(log_prints=True)
def foo():
   bar()
   baz()

if __name__ == "__main__":
   foo()
then bar will return before baz because you call it first, and because we're not dealing with tasks / task runners here, there aren't any futures involved, each subflow call is blocking unless we want to use async / run_deployment
a
sorry i do want to use async / run_deployment
i want them to have different infrastructure than the parent flow, so i use run_deployment
n
gotcha, so with run_deployment it will work almost the same way
Copy code
@flow
def foo():
   subflow_1 = run_deployment("someflow/somedeploy", parameters={...}) # return immediately if timeout=0, else block until finished

   if subflow_1.state.is_completed():
      subflow_1_result = subflow_1.state.result() # requires result persistence on "someflow"
      subflow_2_state = run_deployment("someotherflow/someotherdeploy", parameters={"upstream_result": subflow_1_result, ...})
here ive added an (optional) check to make sure I only call the downstream one if the upstream one was COMPLETED, as opposed to a terminal state like FAILED or CRASHED
a
im seeing it block until the deployment is finished, but not the actual flow tho sometimes i do see flows start as CRASHED, then switch to RUNNING, for some reason. so maybe it is waiting until it gets to CRASHED
oh i see im also doing submit so i can run some of these in parallel, bc i dont want my flows blocking everything, just some downstream ones
how would i achieve that?
n
hmm im not sure i understand your distinction here
im seeing it block until the deployment is finished, but not the actual flow
i dont want my flows blocking everything, just some downstream ones
would you be able to give a minimal concrete example of what you want to do? here's an extreme example of doing a bunch of subflows concurrently - I imagine you're looking for some combo of that and sequential calls like i have above?
a
so it would look something like below ideally, i want would b to depend on a, y to depend on x, and final-flow to depend on y and b. so it would be able to run a and x at the same time, then b once a is done (without waiting on x), and y once x is done (without waiting on a). then once all 4 of those flows are done, it can run the final-flow the way it is coded below, as i understand it, it will block on each flow, so it will go a -> x -> b -> y -> final-flow, which will be a lot slower than my ideal solution. also, to be more accurate to my solution, we can say flow y is a memory intensive process and needs a bigger instance than the other flows.
Copy code
from __future__ import annotations
from prefect import flow
from prefect.deployments import run_deployment


# Parent Flow
@flow(name="flow-of-flows")
def flow():
    flow_a_run = run_deployment.submit(
        name="flow-a"
    ).wait()
    flow_x_run = run_deployment.submit(
        name="flow-x"
    ).wait()
    flow_b_run = run_deployment.submit(
        name="flow-b"
    ).wait()
    flow_y_run = run_deployment.submit(
        name="flow-y"
    ).wait()
    final_flow_run = run_deployment.submit(
        name="final-flow"
    ).wait()

if __name__ == "__main__":
    res = flow()
n
gotcha, to be clear
run_deployment
doesn't have
submit
wait
methods like that, only tasks have
submit
, which returns a
PrefectFuture
that you can call
.result()
on to wait for it i think you might want something like this, where you wrap
run_deployment
calls in tasks to get the
submit
-like interface you wanted to use above, then you can just pass
wait_for
to your tasks like you normally would - make sense?
a
ok let me see. thank you.