Deepanshu Aggarwal
12/06/2022, 1:52 PMfrom prefect import task, flow
from prefect.deployments import run_deployment
@task()
def task_A(event):
output = run_deployment(
name='some deployment',
parameters={'event': event}
)
if output.state_name != 'Completed':
raise Exception("flow run not completed")
return output
@flow()
def flow_B():
a = [1, 2, 3, 4, 5, 6, 7]
# method 1
for i in a:
task_A.submit(i)
# method 2
output = task_A.map(a)
for tasks in output:
tasks.wait()
#do something when all the parallel executions have completed for task A
this is what my flow looks like but in method 1 it runs next iteration of task_A only when previous iteration is complete
in method2 it doesnt wait for the tasks to completeKhuyen Tran
12/06/2022, 5:44 PMtask_a.map()
, you can use submit
or another map
@flow()
def flow_B(a = [1, 2, 3, 4, 5, 6, 7]):
output = task_A.map(a)
# this
task_B.map(output)
# or this
task_b.submit(output)
If not, you can use `wait_for`:
task_b.submit(wait_for=output)
Bring your towel and join one of the fastest growing data communities. Welcome to our second-generation open source orchestration platform, a completely rethought approach to dataflow automation.
Powered by