I'm trying to understand the best way to scale out...
# ask-community
c
I'm trying to understand the best way to scale out our tasks on GCP. It seems like there are two paradigms, either an
Agent
or
Executor
(or both). I read this SO answer already, along with this blog post on using an ECS Agent, which doesn't mention Dask. Background: our workflows are heterogenous between tasks (some just moving some stuff around, some very parallel, some requiring GPU) and between runs (sometimes will be much more parallel), and very bursty. Some tasks are very long-lived and will spawn cloud containers of their own, but I assume Prefect doesn't need to care about that (unless we wanted those to be able to log back to Prefect). VertexAgent/VertexRun Following the blog post, we could set up a Vertex
Agent
(just merged on GitHub, not on PyPI yet) as the agent. We could then set up a Flow-of-Flows (as can only specify
VertexRun
parameters at the Flow level), specifying the image, CPU etc for each flow. This would by default use a
LocalExecutor
, so any parallelism within that Flow would simply be run sequentially? But each separate run of that Flow would automatically spin up the needed instances. DaskExecutor If we wanted Task-level parallelism, we'd need to use the
DaskExecutor
? So then we'd have a Vertex instances running the
Agent
, and then a bunch of Vertex instances running `Flow`s, and then a Dask cluster running parallelised `Task`s? Have I understood that correctly? Would there be a way to avoid Dask by eg parallelising at the flow-to-flow interface?
k
Hey @Chris Arderne, so your thinking is completely right. You can set the resources on the Flow level upon registration. The default is indeed
LocalExecutor
, but we commonly see
LocalDaskExecutor
+
ECSRun
to utilize all of the cores of the machine that the flow is running on. It also helps to specify the
num_workers
.
LocalDaskExecutor
is already enough to utilize the cores of your machine. If you are using
Vertex
+
DaskExecutor
, I think it would normally be an external cluster like
DaskExecutor(cluster_address_here)
.
DaskExecutor
on the
Vertex
compute would not yield anything over the
LocalDaskExecutor
. On the last question of avoiding Dask to parallelize at the Flow level. When you do
StartFlowRun
or
create_flow_run
, it won’t wait for the subflow to complete by default. So the behavior that you see should be kicking off the Flow run, and then moving on to kicking off the next Flow runs. So yes, this spins up a bunch of Vertex jobs running simultaneously
c
Great thanks @Kevin Kho! Will see if I can get something running without Dask — the parallelism will be anywhere from 1 to ~100 between runs, so
LocalDask
wouldn't help much. I haven't played much with the flow-to-flow idiom, but it seems like it should be possible to do the following four `Flow`s: 1. “Master” flow gets kicked off by
Agent
2. Starts a simple sequential flow that prepares a bunch of things and determines how parallel subsequent steps will be. 3. Result from that is passed to a
StartFlowRun
with
.map()
so a bunch of parallel
Flow
runs are kicked off on separate Vertex instances 4. Results from that are reduced back down to a single sequential
Flow
that summarises and eg inserts results into our database So all of these would use
wait=True
, but the middle Flow would be parallelised… Does that make sense/should that work?
k
It’s not straightforward to get results from subflows because the process doesn’t happen on the same machine. So there is no in-memory passage of data that you normally have when they are in the same script. In 0.15.0 and above, we have 3 tasks that give you a lot more granularity that give some more flexibility:
create_flow_run
,
wait_for_flow_run
and
get_task_run_result
. You would use the
get_task_run_result
to fetch the result from another flow, but you need to know the task slug of the task with the result you want to fetch ahead of time.
So for some you can use the
wait_for_flow_run
and others you can just create and not wait.
Instead of
get_task_run_result
, you can also have those subflows persist that data somewhere and then load it in the main flow after waiting for the completion.
c
Yeah passing data back is not actually too important, just knowing they're done so the next tasks can kick off.
I saw those newer tasks, not very clear what the difference is but will go play. Thanks for your help @Kevin Kho !
k
StartFlowRun was too overloaded (sometimes it returns the child flow id, sometimes it returns the state if you wait)
The newer tasks are more consistent in their returns
👍 1