What's the correct way to spin up sub flows in parallel (like truly parallel, not concurrent with `a...
d
What's the correct way to spin up sub flows in parallel (like truly parallel, not concurrent with
asyncio
)? I'm doing the following for now, but it does not preserve flow-subflow relationships (and, it makes sense because the parent process isn't passing any prefect information to the child process in multiprocessing). The alternative here would be to use a Dask runner for all the subflows, in which case every task would be run truly in parallel, but all the subflows would be in the same process as the parent flow. However, it feels more natural to map flows with processes, rather than tasks with processes.
Copy code
import multiprocessing
import sub_flow

def sub_flow_wrapper(foo):
  sub_main(foo)

@flow
def main():
  with multiprocessing.Pool() as pool:
    pool.starmap(sub_flow_wrapper, [foo1, foo2, foo3])
n
hi @Dev Dabke for CPU bound stuff we do generally recommend Dask over manual multiprocessing. if you want to use dask across many processes, i might explore having 2 deployments, one parent that farms out batches of work to a child deployment via run_deployment, where the child can have a dask task runner
d
So, I am using deployments that way
But if I want to test something locally first, I wouldn't be able to use deployments because there's a whole build/deploy process for deployments.
Basically, when I'm testing locally, I want to run locally. When something finally goes to production, I can use run_deployment because the necessary containers have been built and the flows have been deployed to prefect cloud.
n
for local testing, you could serve the same child flow as a test version of that deployment which would allow you to submit runs to that local process from the parent with run_deployment, .serve is nice for testing deployment mechanics like triggers and ad hoc runs without engaging with CI / workers etc
d
Ahhh thanks! This solution looks workable (though there are some mechanics to figure out, e.g., if we change the code, do we have to call
.serve
again?)
One quick followup here:
run_deployment
is
async
, but I'm a bit confused about its usage here. The ideal behavior would be that I can process it in an asyncio gather, but it seems as though
await
doesn't do anything. In particular, if I just call
run_deployment
, it blocks instead of returning a promise. I understand that I can set a
timeout
, but it's not quite what I want. Looking at the source, it would be great if this function just returned (instead of awaiting):
Copy code
client.create_flow_run_from_deployment(
        deployment.id,
        parameters=parameters,
        state=Scheduled(scheduled_time=scheduled_time),
        name=flow_run_name,
        tags=tags,
        idempotency_key=idempotency_key,
        parent_task_run_id=parent_task_run_id,
        work_queue_name=work_queue_name,
        job_variables=jv,
    )
Any thoughts on what to do here? I could just inline the function and have it return what I need.
n
> The ideal behavior would be that I can process it in an asyncio gather, but it seems as though
await
doesn't do anything. sorry can you explain what behavior you're looking for? by default,
run_deployment
will block until the underlying flow run is in a terminal state, so you can await
run_deployment
calls sequentially or
asyncio.gather
those if you want alternatively you can say
timeout=0
to just create the run and return the
FlowRun
that has its ID, so you can fetch it later after polling for state yourself or something
d
I'll experiment more with this to figure out something ergonomic that will work for us
And will followup on this thread with what we find
👍 1
r
I just asked a similar question Dev: https://prefect-community.slack.com/archives/C03D12VV4NN/p1717161816216209 Have you done any additional testing on your end? Does your experience with using
run_deployment
for sub-flows match mine?