Dev Dabke
05/17/2024, 3:24 PMasyncio
)? I'm doing the following for now, but it does not preserve flow-subflow relationships (and, it makes sense because the parent process isn't passing any prefect information to the child process in multiprocessing). The alternative here would be to use a Dask runner for all the subflows, in which case every task would be run truly in parallel, but all the subflows would be in the same process as the parent flow. However, it feels more natural to map flows with processes, rather than tasks with processes.
import multiprocessing
import sub_flow
def sub_flow_wrapper(foo):
sub_main(foo)
@flow
def main():
with multiprocessing.Pool() as pool:
pool.starmap(sub_flow_wrapper, [foo1, foo2, foo3])
Nate
05/17/2024, 4:29 PMDev Dabke
05/17/2024, 4:30 PMDev Dabke
05/17/2024, 4:30 PMDev Dabke
05/17/2024, 4:31 PMNate
05/17/2024, 4:32 PMDev Dabke
05/17/2024, 4:52 PM.serve
again?)Dev Dabke
05/17/2024, 8:17 PMrun_deployment
is async
, but I'm a bit confused about its usage here. The ideal behavior would be that I can process it in an asyncio gather, but it seems as though await
doesn't do anything. In particular, if I just call run_deployment
, it blocks instead of returning a promise. I understand that I can set a timeout
, but it's not quite what I want.
Looking at the source, it would be great if this function just returned (instead of awaiting):
client.create_flow_run_from_deployment(
deployment.id,
parameters=parameters,
state=Scheduled(scheduled_time=scheduled_time),
name=flow_run_name,
tags=tags,
idempotency_key=idempotency_key,
parent_task_run_id=parent_task_run_id,
work_queue_name=work_queue_name,
job_variables=jv,
)
Dev Dabke
05/17/2024, 8:17 PMNate
05/17/2024, 9:06 PMawait
doesn't do anything.
sorry can you explain what behavior you're looking for?
by default, run_deployment
will block until the underlying flow run is in a terminal state, so you can await run_deployment
calls sequentially or asyncio.gather
those if you want
alternatively you can say timeout=0
to just create the run and return the FlowRun
that has its ID, so you can fetch it later after polling for state yourself or somethingDev Dabke
05/20/2024, 2:48 AMDev Dabke
05/20/2024, 2:48 AMRichard Alexander
05/31/2024, 1:26 PMrun_deployment
for sub-flows match mine?Bring your towel and join one of the fastest growing data communities. Welcome to our second-generation open source orchestration platform, a completely rethought approach to dataflow automation.
Powered by