<@ULVA73B9P> in prefect 3, if I have a flow A that...
# ask-marvin
d
@Marvin in prefect 3, if I have a flow A that calls flow B twice and I need all flow B calls to run concurrently and only when all B calls are complete should it proceed with the rest of flow A, how would I do that?
m
Oh joy, yet another orchestration conundrum. Let me guess, you've found yourself in the delightful situation where you need to manage concurrent flows, but the documentation has left you yearning for the simplicity of rocket science. Which version of Prefect are you using? 2.x or 3.x?
d
@Marvin 3
n
doh idk why its not picking that up, ill try and fix that soon
👌 1
m
Oh, the irony. Trying to schedule flows and wait for them to finish while not going mad. From the Prefect 3.x documentation snippets, it seems you should look into using concurrent tasks and flows. Here's the basic idea distilled from the documentation: 1. Use the
.submit
method to run tasks (or sub-flows) concurrently. 2. Use the
wait_for
method to ensure the main flow waits until all concurrent tasks (or sub-flows) finish. Here's a very simple conceptual example, using your requirements:
Copy code
python
from prefect import flow, task

@task
def sub_flow_call(sub_flow_param):
    # What you want to do in each sub-flow
    pass

@flow
def main_flow():
    sub_flow_1 = sub_flow_call.submit("param_for_first_call")
    sub_flow_2 = sub_flow_call.submit("param_for_second_call")
    
    # Wait for both sub_flows to complete
    sub_flow_1_result = sub_flow_1.result()
    sub_flow_2_result = sub_flow_2.result()
    
    # Continue with rest of the flow only after both sub_flows are done
    continue_with_main_flow()

def continue_with_main_flow():
    # The remaining tasks in your main flow
    pass

if __name__ == "__main__":
    main_flow()
This way, both instances of
sub_flow
run concurrently, and the flow only proceeds once both calls are complete. Feel free to peruse Prefect 3.x API and Concurrency Limits for more detailed info. Good luck taming those flows.
d
@Marvin submit does not seem to be a valid function for a flow
m
Ah, the things I endure to enlighten developers! Based on the findings, it seems like Prefect 3.x doesn't use
.submit
for flows directly. Instead, you need to orchestrate the sub-flows using tasks that can run asynchronously. Here's a more accurate approach using Prefect 3.x capabilities: 1. You should call each sub-flow within a task. 2. Use
anyio
or Prefect's built-in async capabilities to run them concurrently. Here’s a better-tailored example:
Copy code
python
from prefect import flow, task
import anyio

@task
async def call_subflow(name):
    result = await subflow.run(parameters={"name": name})
    return result

@flow
async def subflow(name):
    # Replace this with the actual subflow work
    print(f"Subflow {name} is running")
    return f"Result of {name}"

@flow
async def main_flow():
    # Launch sub-flows concurrently
    results = await anyio.gather(
        call_subflow("First call"),
        call_subflow("Second call")
    )
    
    # Process your results after all sub-flows complete
    for result in results:
        print(result)
    
    continue_with_main_flow()  # Rest of your main flow tasks

def continue_with_main_flow():
    print("Continuing with the main flow after sub-flows are complete")

if __name__ == "__main__":
    import asyncio
    asyncio.run(main_flow())
This example uses
anyio.gather
to run the
call_subflow
tasks concurrently and waits for all of them to finish before proceeding. Enjoy navigating the labyrinth, here's the link to the Prefect 3.x docs where you can delve even deeper into more delightful nuances. Good luck!
n
oof yeah so there's the submit interface on tasks, not flows, but tasks and flows are otherwise just called or `asyncio.gather`ed like normal python functions
Copy code
In [1]: from prefect import flow

In [2]: import asyncio

In [3]: @flow
   ...: async def b(): ...

In [4]: @flow(log_prints=True)
   ...: async def a():
   ...:     await asyncio.gather(b(), b())
   ...:     print('stuff after')
   ...:

In [5]: await a()
10:40:07.484 | INFO    | prefect.engine - Created flow run 'capable-yak' for flow 'a'
10:40:07.487 | INFO    | prefect.engine - View at <http://0.0.0.0:4200/runs/flow-run/3ee7bd12-7dbf-43e0-97f7-025380da5cc3>
10:40:07.841 | INFO    | Flow run 'capable-yak' - Created subflow run 'pretty-heron' for flow 'b'
10:40:07.844 | INFO    | prefect.engine - View at <http://0.0.0.0:4200/runs/flow-run/02cc238d-dd3c-4e23-98ca-e1e46402d768>
10:40:07.916 | INFO    | Flow run 'pretty-heron' - Finished in state Completed()
10:40:07.957 | INFO    | Flow run 'capable-yak' - Created subflow run 'hungry-capybara' for flow 'b'
10:40:07.958 | INFO    | prefect.engine - View at <http://0.0.0.0:4200/runs/flow-run/68879220-2e4c-47d1-97a3-4138c438731e>
10:40:08.017 | INFO    | Flow run 'hungry-capybara' - Finished in state Completed()
10:40:08.017 | INFO    | Flow run 'capable-yak' - stuff after
10:40:08.042 | INFO    | Flow run 'capable-yak' - Finished in state Completed()
d
@Nate Thanks. The flow being called is not async though and I assume I need to adapt all the tasks wrapped into flow B accordingly I tried futures to deploy a lot of B functions which works locally but not in cloud but instead the first called flow [B] runs as expected but the others hang and sit in the running state indefinitely.
my thinking is all flow B subtasks are independent from other runs of flow B so looking for w way to run concurrently
n
does b need to be a flow? or can it be a task
Copy code
In [1]: from prefect import flow, task

In [2]: @task
   ...: def b(): ...

In [3]: @flow(log_prints=True)
   ...: def a():
   ...:     futures = []
   ...:     futures.append(b.submit())
   ...:     futures.append(b.submit())
   ...:     print([future.result() for future in futures])
   ...:     print('stuff after')
   ...:

In [4]: a()
10:44:40.270 | INFO    | prefect.engine - Created flow run 'exuberant-quail' for flow 'a'
10:44:40.272 | INFO    | prefect.engine - View at <http://0.0.0.0:4200/runs/flow-run/5f12fb98-98af-48f7-b2b0-272a751134dc>
10:44:40.309 | INFO    | Flow run 'exuberant-quail' - Submitting task b to thread pool executor...
10:44:40.311 | INFO    | Flow run 'exuberant-quail' - Submitting task b to thread pool executor...
10:44:40.332 | INFO    | Task run 'b-9e0' - Created task run 'b-9e0' for task 'b'
10:44:40.333 | INFO    | Task run 'b-d6e' - Created task run 'b-d6e' for task 'b'
10:44:40.368 | INFO    | Task run 'b-9e0' - Finished in state Completed()
10:44:40.373 | INFO    | Task run 'b-d6e' - Finished in state Completed()
10:44:40.384 | INFO    | Flow run 'exuberant-quail' - [None, None]
10:44:40.384 | INFO    | Flow run 'exuberant-quail' - stuff after
10:44:40.412 | INFO    | Flow run 'exuberant-quail' - Finished in state Completed()
d
it is a mix of about 10 tasks that carry out a series of steps in order with task B reading a file based off the output from A etc so wrapped into a flow to represent the full body of work The above example looks similar to what I am currently doing [albeit with a flow being triggered] which seems to complete all tasks and looks correct in the cloud dashboard but only when I run the code locally [under my cloud account though] but when deploying the same via docker and triggering from the cloud dashboard, only 1 of the list of flows completes and the rest are set to running but nothing is actually happening in the container
To be fully clear, I actually wrapped the flow B into another task so I could use submit with futures but above is the logic.
n
I will mention you can nest tasks now, so you can use the submit/map interface as well as encapsulate a series of tasks for organization reasons but not sure about this without more info
only 1 of the list of flows completes and the rest are set to running but nothing is actually happening in the container
if there's a discrepancy between how your flow runs locally / elsewhere it might be a version or config issue
d
@Nate Really appreciate your help here Went down a rabbit hole as prefect deploy hangs indefinitely if there is a python compilation error with the set deployment but now back to trying to make this work fully. I now have the flow calling a task with the following pattern for the flow calling task
collect_js_
Copy code
for entry in url_set:
    futures.append(collect_js_.submit(entry))

wait(futures)
This results in only the first entry from the set running correctly along with its nested tasks [this is also clear from the visual showing the nested tasks and their state], with the rest seeming to start at the same time as per the visual in the cloud dashboard however they do not actually run nor does it show their nested tasks. Running the same thing with task_runner=DaskTaskRunner() gives a
AssertionError: daemonic processes are not allowed to have children
which may relate to how prefect processes work in V3? Any ideas?
n
hi @Denver H - can you share exactly what version you have? i think we fixed an error like this with nested .submit calls in prefect-dask
d
I seem to have found a possible workaround by setting cluster_kwargs={"processes": False} but unclear if that changes the true parallelism