William Jamir
11/08/2023, 9:43 AM@task()
def bar(arg1, arg2):
print(f"Bar {arg1, arg2}")
return "Value"
@task()
def bar2(arg1, arg2):
print(f"Bar {arg1, arg2}")
@flow
def acme():
for i in [{"arg1": 1, "arg2": 2}, {"arg1": 2, "arg2": 4}]:
result = bar.submit(**i)
result2 = bar2.submit(**i, arg3=result)
Please show me the equivalent on this but using @flow
on functions bar and bar2Marvin
11/08/2023, 9:44 AMmap
function. Here's how you can modify your code to achieve this:
from prefect import Flow, Parameter, unmapped
from prefect.tasks.control_flow import merge
@flow
def bar(arg1, arg2):
print(f"Bar {arg1, arg2}")
return "Value"
@flow
def bar2(arg1, arg2, arg3):
print(f"Bar {arg1, arg2, arg3}")
@flow
def acme():
args = Parameter('args', default=[{"arg1": 1, "arg2": 2}, {"arg1": 2, "arg2": 4}])
result = bar.map(arg1=[i['arg1'] for i in args], arg2=[i['arg2'] for i in args])
result2 = bar2.map(arg1=[i['arg1'] for i in args], arg2=[i['arg2'] for i in args], arg3=result)
merge(result, result2)
In this code, bar.map
and bar2.map
will run the bar
and bar2
flows for each set of arguments in parallel. The merge
function is used to ensure that all tasks have completed before the flow finishes.
Please note that this is a simplified example and actual implementation may vary based on your specific requirements.
For more information, you can refer to this discussion on the Prefect community forum.William Jamir
11/08/2023, 9:47 AMJenny
11/08/2023, 1:26 PMJenny
11/08/2023, 1:26 PM