Hi ! i was trying to write a flow which runs a deployment (wrapped under a task) parallely for a se...
d
Hi ! i was trying to write a flow which runs a deployment (wrapped under a task) parallely for a set of input ( in array) . and i want to perform certain actions only after all these runs have completed.
Copy code
from prefect import task, flow
from prefect.deployments import run_deployment


@task()
def task_A(event):
    output = run_deployment(
        name='some deployment',
        parameters={'event': event}
    )
    if output.state_name != 'Completed':
        raise Exception("flow run not completed")
    return output


@flow()
def flow_B():
    a = [1, 2, 3, 4, 5, 6, 7]
    # method 1
    for i in a:
        task_A.submit(i)

    # method 2
    output = task_A.map(a)
    for tasks in output:
        tasks.wait()

    #do something when all the parallel executions have completed for task A
this is what my flow looks like but in method 1 it runs next iteration of task_A only when previous iteration is complete in method2 it doesnt wait for the tasks to complete
1
k
If you are trying to run deployments in parallel, method 2 seems like a solution then. If the downstream task uses the outputs of
task_a.map()
, you can use
submit
or another
map
Copy code
@flow()
def flow_B(a = [1, 2, 3, 4, 5, 6, 7]):
    output = task_A.map(a)

    # this
    task_B.map(output)

    # or this
     task_b.submit(output)
If not, you can use `wait_for`:
Copy code
task_b.submit(wait_for=output)