Hello again. How can I run the same Flow parallell...
# best-practices
m
Hello again. How can I run the same Flow parallelly? I have used code from https://discourse.prefect.io/t/how-can-i-run-multiple-subflows-or-child-flows-in-parallel/96 and it works. But when I simplify it and modify to run same Flow (not different 4) then I get
Copy code
RuntimeError("The task runner is already started!")
Modified code below:
Copy code
import asyncio
from prefect import flow

@flow
async def subflow_1():
    print("Subflow 1 started!")
    await asyncio.sleep(1)


@flow
async def main_flow():
    parallel_subflows = [subflow_1(), subflow_1()]
    await asyncio.gather(*parallel_subflows)

if __name__ == "__main__":
    main_flow_state = asyncio.run(main_flow())
a
I can reproduce the issue and I can confirm that it's either a bug or that the tutorial must be updated if there is another way of doing that. But I'd like to know why would you want to do it this way? Do you really want to run the same subflow twice in parallel with the same parameters? I'm curious what is your use case for that? As a temporary solution, turning that into a task instead of a subflow will work
m
Nah, i would like to run the same flow with different parameters
a
can you explain your use case a bit more? are those subflows containing a lot of tasks? if you would build those as tasks, then the issue becomes much simpler, but I can understand why subflows may sometimes be better
m
Yep, I'm just checking if we can achieve our goals - and as mentioned in other thread: https://prefect-community.slack.com/archives/C03D12VV4NN/p1654007524220189?thread_ts=1654004474.016759&cid=C03D12VV4NN we will probably need to use combination of nested Flows/Tasks to make it possible that we have reusable code that we can track on graph with desired granularity. If it won't be possible maybe logs will be enough for us, but then there is this problem with logger for DaskTaskRunner 😉 So I'm just checking what's possible and what is not
Possibility of seeing details of flow on low level is really nice, but for that, as I understand, I need to use Flow->Taks->Flow->Task... combination
a
not necessarily - the problem you see here is only when running a single subflow multiple times in parallel - if you don't run those in parallel, it works fine - this works and satisfies your modularity use case:
Copy code
import asyncio
from prefect import flow

@flow 
async def subflow_1():
    print("Subflow 1 started!")
    await asyncio.sleep(1)


@flow 
async def main_flow():
    for _ in range(5):
        await subflow_1()

if __name__ == "__main__":
    main_flow_state = asyncio.run(main_flow())
@Marvin open "Running the same subflow concurrently multiple times raises `RuntimeError("The task runner is already started!")`"
m
this seems to do the trick, at least to be a functioning workaround, but will have to test it further: copying the flow with deepcopy
Copy code
import asyncio
import copy
from prefect import flow

@flow
async def subflow_1(a):
    print("Subflow 1 started!" + a)
    await asyncio.sleep(1)


@flow
async def main_flow():
    parallel_subflows = [subflow_1('a'), copy.deepcopy(subflow_1)('b')]
    await asyncio.gather(*parallel_subflows)

if __name__ == "__main__":
    main_flow_state = asyncio.run(main_flow())
Copy code
21:11:23.753 | INFO    | prefect.engine - Created flow run 'lyrical-beluga' for flow 'main-flow'
21:11:23.757 | INFO    | Flow run 'lyrical-beluga' - Using task runner 'ConcurrentTaskRunner'
21:11:23.796 | WARNING | Flow run 'lyrical-beluga' - No default storage is configured on the server. Results from this flow run will be stored in a temporary directory in its runtime environment.
21:11:24.077 | INFO    | Flow run 'lyrical-beluga' - Created subflow run 'steadfast-waxbill' for flow 'subflow-1'
Subflow 1 started!a
21:11:24.279 | INFO    | Flow run 'lyrical-beluga' - Created subflow run 'venomous-ibex' for flow 'subflow-1'
Subflow 1 started!b
21:11:25.223 | INFO    | Flow run 'steadfast-waxbill' - Finished in state Completed()
21:11:25.409 | INFO    | Flow run 'venomous-ibex' - Finished in state Completed()
21:11:25.450 | INFO    | Flow run 'lyrical-beluga' - Finished in state Completed('All states completed.')
a
thanks for this addition - it would be best if we continue on the GitHub issue so that updates are there directly
m
ok