hello, is it possible to run dependent flows (flow...
# ask-community
j
hello, is it possible to run dependent flows (flow of flows) like shown here https://docs.prefect.io/core/idioms/flow-to-flow.html without registering to the BE server. For example create the flow runs and call
Copy code
from prefect import Flow
from prefect.schedules import CronSchedule
from prefect.tasks.prefect import create_flow_run, wait_for_flow_run


weekday_schedule = CronSchedule(
    "30 9 * * 1-5", start_date=pendulum.now(tz="US/Eastern")
)


with Flow("parent-flow", schedule=weekday_schedule) as flow:
    
    # assumes you have registered the following flows in a project named "examples"
    flow_a = create_flow_run(flow_name="A", project_name="examples")
    wait_for_flow_a = wait_for_flow_run(flow_a, raise_final_state=True)
    
    flow_b = create_flow_run(flow_name="B", project_name="examples")
    wait_for_flow_b = wait_for_flow_run(flow_b, raise_final_state=True)
    
    flow_c = create_flow_run(flow_name="C", project_name="examples")
    wait_for_flow_c = wait_for_flow_run(flow_c, raise_final_state=True)
    
    flow_d = create_flow_run(flow_name="D", project_name="examples")
    wait_for_flow_d = wait_for_flow_run(flow_d, raise_final_state=True)
    
    b = wait_for_flow_b(upstream_tasks=[wait_for_flow_a])
    c = wait_for_flow_c(upstream_tasks=[wait_for_flow_a])
    d = wait_for_flow_d(upstream_tasks=[b, c])
flow.run()
z
You can run inner flows with
flow.run()
but then they won’t be tracked
j
when you say they won't be tracked you mean it wouldn't be flow of flows?
k
It won’t appear in the UI because it’s not a registered run
j
thats ok by me, but when i try to run the code above i get an error that is dont have flow_run_id
so the question is whether it is possible to run flow of flows with no server at all
k
Michael is suggesting to call the
flow.run()
directly instead of using the
create_flow_run
task cuz that code calls the backend API
If you all Python scripts directly, you can just use the
ShellTask
j
in case i use
flow.run()
it seems like
wait_for_flow_run
is not really working
k
wait_for_flow_run
takes in a flow id so it needs a backend. If you use the ShellTask though combined with
flow.run()
, I think it might wait for it
But you can chain them also if you import them from other files:
Copy code
from flow_one import flowA
from flow_two import flowB

flowA.run()
flowB.run()
j
when you say to use ShellTask you mean to call my python script through a shell task and then call the shell task script from command line
k
Yeah exactly
j
that will execute the tasks. But if I need a flow of flows where b and c are run in parallel it wouldn't be possible.
k
Did you try with the ShellTask and LocalDaskExecutor?
j
yes ill dm you the script