Andrew Lawlor
10/23/2023, 3:16 PMNate
10/23/2023, 4:26 PMfrom prefect import flow
@flow
def bar():
print("first!")
@flow
def baz():
print("second!")
@flow(log_prints=True)
def foo():
bar()
baz()
if __name__ == "__main__":
foo()
then bar will return before baz because you call it first, and because we're not dealing with tasks / task runners here, there aren't any futures involved, each subflow call is blocking unless we want to use async / run_deploymentAndrew Lawlor
10/23/2023, 5:41 PMAndrew Lawlor
10/23/2023, 5:42 PMNate
10/23/2023, 5:46 PM@flow
def foo():
subflow_1 = run_deployment("someflow/somedeploy", parameters={...}) # return immediately if timeout=0, else block until finished
if subflow_1.state.is_completed():
subflow_1_result = subflow_1.state.result() # requires result persistence on "someflow"
subflow_2_state = run_deployment("someotherflow/someotherdeploy", parameters={"upstream_result": subflow_1_result, ...})
here ive added an (optional) check to make sure I only call the downstream one if the upstream one was COMPLETED, as opposed to a terminal state like FAILED or CRASHEDAndrew Lawlor
10/23/2023, 5:49 PMAndrew Lawlor
10/23/2023, 5:51 PMAndrew Lawlor
10/23/2023, 5:52 PMNate
10/23/2023, 6:24 PMim seeing it block until the deployment is finished, but not the actual flow
i dont want my flows blocking everything, just some downstream oneswould you be able to give a minimal concrete example of what you want to do? here's an extreme example of doing a bunch of subflows concurrently - I imagine you're looking for some combo of that and sequential calls like i have above?
Andrew Lawlor
10/25/2023, 2:29 PMfrom __future__ import annotations
from prefect import flow
from prefect.deployments import run_deployment
# Parent Flow
@flow(name="flow-of-flows")
def flow():
flow_a_run = run_deployment.submit(
name="flow-a"
).wait()
flow_x_run = run_deployment.submit(
name="flow-x"
).wait()
flow_b_run = run_deployment.submit(
name="flow-b"
).wait()
flow_y_run = run_deployment.submit(
name="flow-y"
).wait()
final_flow_run = run_deployment.submit(
name="final-flow"
).wait()
if __name__ == "__main__":
res = flow()
Nate
10/25/2023, 8:51 PMrun_deployment
doesn't have submit
wait
methods like that, only tasks have submit
, which returns a PrefectFuture
that you can call .result()
on to wait for it
i think you might want something like this, where you wrap run_deployment
calls in tasks to get the submit
-like interface you wanted to use above, then you can just pass wait_for
to your tasks like you normally would - make sense?Andrew Lawlor
10/25/2023, 9:30 PM