Thread
#prefect-community
    j

    Joe Schmid

    3 years ago
    Question on using an external Dask cluster (DaskExecutor) running in adaptive mode -- we notice that running a Flow that maps over elements (say 100 elements) only seems to use the number of Dask workers that existed at the time the flow run started. We would have expected the Dask cluster to begin scaling up workers (since it's in adaptive mode) and starting to run more Prefect tasks in parallel. I'm not sure we've really characterized this behavior well, but we've observed it enough that it seems to be a pattern. (We have a notebook that - using the k8s APIs - plots the number of requested workers, running workers, & nodes and could show some of what we're seeing if it would help.)
    j

    josh

    3 years ago
    For my understanding you’re running into a scenario where you have a dask cluster in adaptive mode and (for example) you have N workers, start the flow run, cluster scales up to M workers, but it is still only using the original N?
    b

    Brett Naul

    3 years ago
    could this be related to work stealing being disabled? the prefect docs say to do so but it does seem like that limits the effectiveness of an adaptive cluster
    j

    Joe Schmid

    3 years ago
    @josh what we see is that the cluster doesn't even try to scale up beyond the original N workers. There are a lot of available (Pending) Prefect Tasks but in our case only 2 dask workers running 4 Tasks (2 vCPUs for each worker).
    j

    josh

    3 years ago
    Interesting theory @Brett Naul I think I’ve seen an issue similar to this in the past with not every dask worker being used from an adaptive standpoint. I was using the
    dask-kubernetes
    library and I can’t say for sure if I had work stealing disabled
    Ah @Joe Schmid so you’re seeing a scenario where dask isn’t scaling its amount of workers up. I have not seen that before!
    j

    Joe Schmid

    3 years ago
    @Brett Naul We did recently disable work stealing, but saw the same behavior prior to that.
    j

    josh

    3 years ago
    Are you using dask-kubernetes for your cluster?
    j

    Joe Schmid

    3 years ago
    Yup, running on AWS EKS.
    Specifically, using KubeCluster from dask-kubernetes for the adaptive part.
    j

    josh

    3 years ago
    Interesting, I wonder if the dask scheduler thinks it doesn’t need to create more workers for itself. I’m trying to recreate this on a cluster to see if I get the same issue
    j

    Joe Schmid

    3 years ago
    In case it helps, one aspect of the behavior we observed is that N Prefect tasks start running initially where N = w * c (w = number of initial Dask workers, c = vCPUs which is the same as nthreads that we start the workers with)
    e.g. 4 tasks for 2 workers with 2 vCPU
    j

    josh

    3 years ago
    There’s a couple things I think could be happening:1. It’s starting with the amount of processes available (2 workers w/ 2 CPUs so 4 total) and sticking with that so it doesn’t think it needs to create more workers (e.g. only ever trying to do 4 things at once) 2. The mapped tasks aren’t trying to be executed in parallel but instead are happening sequentially and therefore dask isn’t creating more workers because it doesn’t think it needs to cc @Chris White
    I think I was able to replicate what you’re experiencing. I made a flow:
    @task
    def first_task():
        return [100] * 100
    
    
    @task
    def compute(x):
        return x * 100
    
    
    with Flow(
        "dktest", environment=DaskKubernetesEnvironment(min_workers=1, max_workers=3), storage=Docker(registry_url="my_registry", image_name="flows")
    ) as flow:
        one = first_task()
        result = compute.map(one)
    This flow uses the
    DaskKubernetesEnvironment
    which simply uses
    dask-kubernetes
    to execute the flow on my cluster. When I ran this (with a single map) I never witnessed a second or third worker being created as it was chewing through each mapped task and they were being executed sequentially. However, I changed the flow to contain some parallel branches from the first_task:
    one = first_task()
    result = compute.map(one)
    result2 = compute.map(one)
    result3 = compute.map(one)
    Then I saw more than one dask worker being dynamically created because I believe it was now trying to chew through the maps simultaneously.
    j

    Joe Schmid

    3 years ago
    @josh thanks for the test case! We can give this a shot. Much appreciated.
    Chris White

    Chris White

    3 years ago
    Just catching up - this is interesting; we could also explore replicating this with Dask directly and file a bug report; under the hood, Prefect makes a call to
    Client.map
    https://distributed.dask.org/en/latest/api.html#distributed.Client.map; in Prefect, this call is actually made with a
    worker_client
    , which could also be relevant
    j

    Joe Schmid

    3 years ago
    @Chris White it does seem like a potential bug/issue. At a minimum, it would certainly be desirable behavior to scale up workers when mapping.
    If we can help provide more details, definitely let us know but it sounds like you already have a nice minimal example that demonstrates the issue.
    Chris White

    Chris White

    3 years ago
    yup yup! I’ll sync up with @josh and try to recreate with Dask - we’ll keep you in the loop!
    Hey @Joe Schmid and @Jie Lou — just wanted to update you on this: @josh and I spent some time going down the rabbit hole, and we could sort-of recreate the issue, but not make much progress as to the cause. If the goal is resource conservation, at this moment my recommendation would be to use
    DaskKubernetesEnvironment
    but set the minimum number of workers to the amount you really want a the highest capacity, and at least the cluster will disappear when your run is complete. We’re still trying to figure out why the large number of pending mapped tasks isn’t triggering a scale-up though, and will let you know when we learn something new!
    j

    Jie Lou

    3 years ago
    @Chris White Sounds good. Thanks for the update, Chris.
    Chris White

    Chris White

    3 years ago
    @Joe Schmid / @Jie Lou are the Tasks that you are mapping very fast tasks? What’s a typical duration of the run for these tasks?
    j

    Joe Schmid

    3 years ago
    @Chris White they're quite slow -- often minutes.
    Chris White

    Chris White

    3 years ago
    Interesting. OK so that is not what I was expecting; just to give you some insight, while your tasks are running, you can do the following:
    from distributed import Client
    
    c = Client("<tcp://scheduler-address>")
    c.run_on_scheduler(lambda scheduler: scheduler.adaptive_target())
    and call this function over and over as your Tasks progress; (note you might need to exec into one of your pods to have access to the scheduler). This is the endpoint dask-k8s calls to determine whether scaling should occur — we found that for tasks whose runtime was on the same order as the network communication cost, this function would return 1, but for longer tasks it would very quickly begin returning large values — not a solution, but just so you know the approach we were taking
    j

    Joe Schmid

    3 years ago
    The update from our side is that we put our external Dask cluster in non-adaptive mode this morning and saw significantly better reliability today. (We did some work to make it easy to dynamically adjust our Dask cluster, e.g. adaptive on/off, set min/max # workers, set target # workers for non-adaptive, etc.)
    @Chris White that's helpful on running
    scheduler.adaptive_target()
    we can definitely do that. What we'll likely do in the short term is avoid using adaptive mode for now to remove that as a variable as we do lots of flow runs mapping over many elements.
    Chris White

    Chris White

    3 years ago
    That sounds good!