https://prefect.io logo
#prefect-community
Title
# prefect-community
r

R Zo

04/02/2022, 11:42 AM
Copy code
Hi prefect team,

I am trying to spin up a flow of flows, so I started a prefect server and agent with label "test". Below is a snippet of code that should run child_flow_1 which is a flow with multiple tasks. However child_flow_1 does not run while prefect quickly returns a few success logs and there is no error reported. What I am missing? I have run child_flow_1 without the use of flow of flows and it works fine. Following is a snippet of the log using Dask, but I have tried LocalDaskExecutor as well.

test_flows created
Flow URL: <http://localhost:8080/default/flow/cd6dc2bb-8f54-4f5a-b046-d90e5c853dbe>
 └── ID: e405d852-5c75-4001-815e-619687e592d3
 └── Project: test_flows
 └── Labels: ['mymachinexxx', 'test']
[2022-04-02 22:30:01+1100] INFO - prefect.FlowRunner | Beginning Flow run for 'test_flows'
[2022-04-02 22:30:01+1100] INFO - prefect.DaskExecutor | Connecting to an existing Dask cluster at <tcp://192.168.20.9:8786>
[2022-04-02 22:30:02+1100] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded


test_flows = make_three_flows()
for tfl in test_flows:
        tfl.register("test_flows")
        # tfl.executor = LocalDaskExecutor(scheduler="threads",num_workers=10)

with Flow("parent_flow", run_config=LocalRun(labels=["test"])) as parent_flow_complete:
        working_dir = Parameter("working_dir")  #
        create_flow_run(
            flow_name="child_flow_1",parameters={"working_dir": working_dir})

parent_flow_complete.executor = DaskExecutor(address="<tcp://localhost:8786>")
# parent_flow_complete.register("test_flows")
parent_flow_complete.run(parameters={"working_dir": working_dir},)
a

Anna Geller

04/02/2022, 1:02 PM
Two things may be helpful here: #1 When you run a flow-of-flows, it's best to do it backend-base so instead of doing any
flow.run()
, it's better to first register your child flows, then register the parent flow, and then trigger a parent flow run from a backend. You can do that using CLI:
Copy code
prefect run --name parent_flow --project test_flows
#2 When attaching executor, it's best to attach it to the Flow object to be 100% sure it gets stored in your Flow storage. This is important because Prefect retrieves the executor info at runtime from Storage.
Copy code
with Flow("parent_flow", executor = DaskExecutor(address="<tcp://localhost:8786>")) as flow:
lastly, you really don't seem to need a Dask executor here since you are just triggering a child flow run. Usually, you would only need that with mapping. If you need help, check out this full example - running
parent_flow_example.py
shows the full process
👀 1
👍 1
r

R Zo

04/03/2022, 11:42 AM
Thanks @Anna Geller, that information is very useful. Let me try it out and might come back with more questions.
👍 1
@Anna Geller,the flow of flows worked thanks for the example. I had a few more question: Does the executor get set on the parent flow for all child flows or can each child flow have a different executor ? Also in the example script the agent is the last subprocess called is there a way to tell the agent to quit after completing the flows ?
a

Anna Geller

04/05/2022, 12:13 PM
Nice work! Yes, each child flow can have a different executor. This subprocess was just a hack to show how the flow of flows work because you couldn't run this example with a local flow run - the flow-of-flows orchestrator pattern works only with the backend. Normally, you would need to have this agent running at all times in your production setting
👍 1
r

R Zo

04/05/2022, 11:03 PM
Thanks @Anna Geller.
10 Views