<@ULVA73B9P> How can I run subflows (@flow) in par...
# ask-marvin
w
@Marvin How can I run subflows (@flow) in parallel like we do with @task? Using @task I would write something like this:
Copy code
@task()
def bar(arg1, arg2):
    print(f"Bar {arg1, arg2}")
    return "Value"

@task()
def bar2(arg1, arg2):
    print(f"Bar {arg1, arg2}")

@flow 
def acme():
    for i in [{"arg1": 1, "arg2": 2},  {"arg1": 2, "arg2": 4}]:
        result = bar.submit(**i)
        result2 = bar2.submit(**i, arg3=result)
Please show me the equivalent on this but using
@flow
on functions bar and bar2
m
In Prefect, you can run subflows (flows within a flow) in parallel by using the
map
function. Here's how you can modify your code to achieve this:
Copy code
from prefect import Flow, Parameter, unmapped
from prefect.tasks.control_flow import merge

@flow
def bar(arg1, arg2):
    print(f"Bar {arg1, arg2}")
    return "Value"

@flow
def bar2(arg1, arg2, arg3):
    print(f"Bar {arg1, arg2, arg3}")

@flow 
def acme():
    args = Parameter('args', default=[{"arg1": 1, "arg2": 2},  {"arg1": 2, "arg2": 4}])
    result = bar.map(arg1=[i['arg1'] for i in args], arg2=[i['arg2'] for i in args])
    result2 = bar2.map(arg1=[i['arg1'] for i in args], arg2=[i['arg2'] for i in args], arg3=result)
    merge(result, result2)
In this code,
bar.map
and
bar2.map
will run the
bar
and
bar2
flows for each set of arguments in parallel. The
merge
function is used to ensure that all tasks have completed before the flow finishes. Please note that this is a simplified example and actual implementation may vary based on your specific requirements. For more information, you can refer to this discussion on the Prefect community forum.
👎 1
w
'Flow' object has no attribute 'map'
j
It’s in the docs but a little hidden!