https://prefect.io logo
Title
j

Joe Schmid

01/21/2020, 4:39 PM
Question on Dask work stealing, mapped task distribution, and adaptive mode in the following scenario: • Simple Flow with a single task that maps over items, e.g. 50 items • Long-running Dask cluster in adaptive mode with a minimum of 1 worker always running and work stealing disabled • Kick off a Flow run • One mapped task (i.e. a mapped task for 1 item) begins running on the 1 worker that is immediately available • Other workers begin to start up (takes several minutes since we use k8s cluster autoscaler to request new spot instances in AWS) • Once new workers are available they don't seem to get allocated tasks and we notice that only one worker is busy Do we need to ensure Dask workers are already running and available prior to starting a Flow with a single mapped task? Said another way, does disabling work stealing prevent mapped tasks (that haven't started running) from being allocated to Dask workers that started after the mapped tasks have been submitted to the scheduler?
c

Chris White

01/21/2020, 9:19 PM
Hey @Joe Schmid - sorry for the delay in response; that’s a really good question. If you’re OK with a few tasks potentially running twice, you could try turning off work stealing and see if the scheduled mapped tasks get re-allocated (I expect that they would). Because we use
client.map
to submit all mapped tasks simultaneously, I think this might be the only way to leverage both adaptive worker scaling with single-task mapping
j

Joe Schmid

01/21/2020, 9:42 PM
Thanks @Chris White that makes sense. We'll think a bit more about this -- in some cases (e.g. data science experiments) there's no harm in tasks running twice and in other cases (ETL where tasks aren't idempotent) there could be. Today we (mostly) share a Dask cluster per environment, but we could configure one with work stealing off and one with it on, etc. Either way I think we can just plan for this and address it on our side.
c

Chris White

01/21/2020, 10:29 PM
Interesting - we might want to sync up on how to handle this entirely within prefect; we could improve our version locking mechanism to more gracefully recover from an attempted second run; in that case you could enable version locking on your ETL flows (as a Prefect setting) and not on your ML flows, and even if Dask submits your ETL jobs twice we could try to more gracefully recover the latest Prefect State / data payloads instead of the current behavior which is to essentially end the run with a
ClientError