Hello! I am working with Push Workpools. I have o...
# prefect-cloud
c
Hello! I am working with Push Workpools. I have one deployment (an orchestrator) which will run several (differently parameterised) instances of a second deployment. I would like the orchestrator to trigger all the deployments in parallel then wait for them to complete. How might I accomplish this? I have tried using
prefect.deployments.run_deployment
, but each instance of this blocks until the triggered deployment completes. I want to trigger all deployments, then poll them until they all complete...
k
there are a couple ways to solve this but the approach I'd suggest depends on your design requirements. do you need the results from these deployment runs or just to wait until they all finish?
c
@Kevin Grismore hello! I'm not currently passing results back. I would like to, ultimately, pass back a small dictionary. I'm currently peering at options using asyncio to wrap instances of
run_deployment()
... is that a potential approach?
k
it is! are you looking at
gather
?
c
... not yet ... furiously googling
k
run_deployment
is an async function, so if you call it from an async context without `await`ing it, it'll return a coroutine.
gather
can accept many coroutines as input, and when
gather
is `await`ed, it'll block until it has collected up the results of all those coroutines.
if you're not familiar with async and would rather not get into it, you can place your call to
run_deployment
inside a task and call the task with
.submit()
to submit it to the concurrent task runner
then you'll need to return the results out of run_deployment and then again out of your task if you want to use them elsewhere
c
@Kevin Grismore ok. Lots of good stuff here. I have •
asyncio
is more powerful, but has learning curve • concurrent task runner is quicker, but less feature-rich Many thanks. This was exactly the pointers I was hoping for. I'll have a look and experiment with these options.
šŸ’™ 1
@Kevin Grismore If I could prevail on your expertise some more... I'm trying to use the concurrent task runner solution:
Copy code
@task(log_prints=True)
def trigger_flow_deployment(nde_noop_elt_job_uuid, run_parameters):
    rc = pd1.run_deployment(
        name=nde_noop_elt_job_uuid,
        parameters=run_parameters
    )
    return rc
invoked with
Copy code
@flow(log_prints=True, task_runner=ConcurrentTaskRunner())
def async_manage_noop_dag_multi_task(<various args>) -> None:
	...
	<various setup stuff>
	...
    for rec in run_list:
        run_parameters = <param builder>
        rc = trigger_flow_deployment(nde_noop_elt_job_uuid, run_parameters).submit()
	...
	<some post-op stuff>
However when running, I see a failure log
Copy code
AttributeError: 'FlowRun' object has no attribute 'submit'
... what am i missing.... šŸ˜ž
k
Copy code
rc = trigger_flow_deployment.submit(nde_noop_elt_job_uuid, run_parameters)
c
facepalm 😳
k
don't worry lol I've been there plenty of times
this doesn't cover everything though,
run_deployment
returns a
FlowRun
which you'll need to get the result from
c
Yup - hadn't got that far yet... I wanted to get the concurrency up and running first - that is critical
āœ… 1
@Kevin Grismore ... and thanks. That worked very nicely indeed. I'm going to give the asyncio a shot now....
šŸ’™ 1
a
Old thread but I wanted to point out that
run_deployment
returns immediately if you set the timeout to zero. This allows you to use it to start a flow run and save the ID without waiting for the run to complete. E.g.,
run_deployment(…, timeout=0)