Joe Schmid
09/10/2019, 8:19 PMjosh
09/10/2019, 8:26 PMBrett Naul
09/10/2019, 8:32 PMJoe Schmid
09/10/2019, 8:37 PMjosh
09/10/2019, 8:37 PMdask-kubernetes
library and I can’t say for sure if I had work stealing disabledJoe Schmid
09/10/2019, 8:40 PMjosh
09/10/2019, 8:40 PMJoe Schmid
09/10/2019, 8:40 PMjosh
09/10/2019, 9:00 PMJoe Schmid
09/10/2019, 9:04 PMjosh
09/10/2019, 9:11 PM@task
def first_task():
return [100] * 100
@task
def compute(x):
return x * 100
with Flow(
"dktest", environment=DaskKubernetesEnvironment(min_workers=1, max_workers=3), storage=Docker(registry_url="my_registry", image_name="flows")
) as flow:
one = first_task()
result = compute.map(one)
This flow uses the DaskKubernetesEnvironment
which simply uses dask-kubernetes
to execute the flow on my cluster. When I ran this (with a single map) I never witnessed a second or third worker being created as it was chewing through each mapped task and they were being executed sequentially. However, I changed the flow to contain some parallel branches from the `first_task`:
one = first_task()
result = compute.map(one)
result2 = compute.map(one)
result3 = compute.map(one)
Then I saw more than one dask worker being dynamically created because I believe it was now trying to chew through the maps simultaneously.Joe Schmid
09/10/2019, 9:45 PMChris White
09/10/2019, 9:50 PMClient.map
https://distributed.dask.org/en/latest/api.html#distributed.Client.map; in Prefect, this call is actually made with a worker_client
, which could also be relevantJoe Schmid
09/10/2019, 10:18 PMChris White
09/10/2019, 10:20 PMDaskKubernetesEnvironment
but set the minimum number of workers to the amount you really want a the highest capacity, and at least the cluster will disappear when your run is complete. We’re still trying to figure out why the large number of pending mapped tasks isn’t triggering a scale-up though, and will let you know when we learn something new!Jie Lou
09/12/2019, 10:23 PMChris White
09/12/2019, 10:48 PMJoe Schmid
09/12/2019, 10:48 PMChris White
09/12/2019, 10:53 PMfrom distributed import Client
c = Client("<tcp://scheduler-address>")
c.run_on_scheduler(lambda scheduler: scheduler.adaptive_target())
and call this function over and over as your Tasks progress; (note you might need to exec into one of your pods to have access to the scheduler). This is the endpoint dask-k8s calls to determine whether scaling should occur — we found that for tasks whose runtime was on the same order as the network communication cost, this function would return 1, but for longer tasks it would very quickly begin returning large values — not a solution, but just so you know the approach we were takingJoe Schmid
09/12/2019, 10:54 PMscheduler.adaptive_target()
we can definitely do that. What we'll likely do in the short term is avoid using adaptive mode for now to remove that as a variable as we do lots of flow runs mapping over many elements.Chris White
09/12/2019, 10:58 PM