Hey everyone - I have reached out a few times over...
# prefect-community
j
Hey everyone - I have reached out a few times over the last few days looking for suggestions about how to run up to 3000 distinct runs in parallel in the most efficient way. I see there is a new
Dask Cloud Provider Environment
and want to know if this env would be the best way to accomplish this goal
n
Hi @John Ramirez! That seems like a viable option given your use case and I believe other users are handling their infrastructure similarly; I can't make a determination that it's the best way or not but it's definitely a way. Perhaps some others will chime in on whether they've been deploying with DCP/Fargate and what they've seen
j
When you say 3000 distinct runs in parallel, do you mean 3000 parallel flow runs, or 3000 parallel mapped tasks? The dask-cloudprovider environment stands up a new dask cluster on a cloudprovider (AWS only currently) - if AWS works for you, this is one way of getting a dask cluster going per flow run with minimal configuration. The dask-kubernetes environment is another. Note thought that each of these spins up a new dask cluster per flow run. If your flow runs are small and don't need to be parallelized, starting a dask cluster per flow run can be relatively expensive (excess resources you may not need). But if your flow has many tasks that could be run in parallel, using a dask-backed environment can be quite beneficial. In short: • Use a Dask environment (like the dask-cloudprovider environment) if you need task-level parallelism • Use a scalable agent (like the fargate agent or k8s agent) if you need flow-run level parallelism
upvote 2
j
So here is the dilemma I face. I have a flow that is compute heavy, processing/aggregating historical data and generating results for a single day. For me the easiest thing to do is to have a way to run multiple flows in parallel for each date. Or I can refactor the flow to process the date range and provide results for the range which will take longer.
j
@John Ramirez Jim's description above is excellent and should help you decide on an approach. (@Jim Crist-Harif that description is such a good way to think about it, maybe it should be added to docs somewhere!) I worked on
DaskCloudProviderEnvironment
and am happy to answer questions about it, but maybe give us a quick update on which approach that Jim laid out might be a better fit for you and we could provide more thoughts.
upvote 1
@John Ramirez with your current model, using a scalable agent (at a minimum) seems to be the more appropriate approach. This would allow your many Flow runs to execute in parallel. Do you have any task parallelism (e.g. you use
map()
) within your Flow?
👍 1
(And are you using a cloud provider like AWS, Azure, GCP?)
j
I am using AWS EKS and dask within the cluster to run the flows. I do use
map()
extensively because the flow consumes a parameter file with 9-10 distinct combinations to evaluate.
j
Since you're already using EKS, then I recommend you stick with kubernetes agents and environments. So the
DaskKubernetesEnvironment
would be my recommendation.
Switching to
DaskCloudProviderEnvironment
won't get you anything, it's just a way to deploy dask on different infrastructure.
👍 1
Does that make sense?
d
Two cents here: I am not sure I totally understand you use case, but we also have intensive jobs on a several hundreds of input files. We do have several hundreds or thousands of Tasks but only a handful of Flows. For resource management, we use the DaskKubernetesEnvironment.
upvote 1
j
ok let me step back for a moment. Running 3000 flow runs is NOT a common occurrence. this is what you could call an ad-hoc evaluation run. I might do this once every 4-6 months and I don’t need the capacity to support this in EKS all the time. My hope was that I could use
DaskCloudProviderEnvironment
to run these ad-hoc runs on demand.
j
Sure. Running with
DaskCloudProviderEnvironment
would be one way to manage this then, since you'd be deploying those flows outside the EKS cluster. Another way would be to configure a scalable node pool in EKS, and continue using
DaskKubernetesEnvironment
- when under load (say kicking off 3000 jobs) EKS will scale up, but will scale back down when not needed. Either option would work. I think keeping everything in k8s is slightly simpler, but that's up to you.
👍 1
upvote 1
j
Actually I have a dask cluster running in EKS and using the
RemoteEnvironment
NOT the
DaskKubernetesEnvironment
. It was never stable enough and would constantly final
j
@John Ramirez that is similar to our original infrastructure: we have our own k8s cluster, we create the Dask cluster ourselves (with k8s deployments/services) and scale workers manually. (Using https://fluxcd.io/ FWIW) Then we use RemoteEnvironment to run flows on that Dask cluster. If you wanted to stay with k8s, another option is to install Dask Gateway to handle dynamic creation of Dask clusters, either manually or in a custom environment per flow run. (See this thread from another Prefect user who's already using Dask Gateway and might create a custom environment for it.)
If you were open to moving outside of k8s, you could try out
DaskCloudProviderEnvironment
. If you wanted to pursue that route, I'd recommend starting by using the Dask Cloud Provider project directly, make sure you can create and access a Dask cluster with it successfully, and then try out
DaskCloudProviderEnvironment
.
To Jim's important point above,
DaskCloudProviderEnvironment
does come with significant overhead in Flow run startup time due to Fargate allocating resources and pulling docker images. We currently see about 5 minutes of latency at the start of Flow runs because these 3 things happen sequentially: 1. Fargate Agent launches a Fargate task for the flow run. (We can reduce this a bit if we use EC2 launch type instead of Fargate and have an ec2 instance registered with ECS, but then you're losing the serverless aspect of Fargate.) 2. Flow run calls execute() in
DaskCloudProviderEnvironment
which creates a Fargate task for the Dask scheduler. It has to wait for the scheduler to be created in order to get the scheduler address to start the workers with. 3. Finally, start Fargate task(s) for the Dask worker(s) If you're not sensitive to this startup latency, then this is a very nice approach that is easy to configure, but it does come with this overhead.
j
And what does the group think about a refactor of the flow? is it worth refactoring my code base on the workflow to accept a date range rather than spawning 3000 flow runs given this is ad-hoc situation
j
I really can't say without knowing more details. I can give you some things to think about though: • In prefect, a flow run is a single unit with a final state (e.g. Success or Failure). If your 3000 flow runs really comprise a single job, it might make more sense to manage them as one large flow run. • A flow run is always going to be a more heavyweight thing than a task run. If each flow run is sufficiently long, the overhead of starting a new flow run will be negligible compared to the total computation time, but as flow runs get shorter (or if flow runs share calculations), it might make sense to merge multiple flow runs into a larger flow. • In the absence of result persistence (e.g. with a ResultHandler), if you need to re-run a flow, it will need to recompute all its comprising tasks. So if 1 of the 3000 flow runs fails, restarting it is easy. If they're part of a larger flow you'll either need to enable some kind of persistence, or change the parameters before rerunning to avoid recomputing.
💯 1