Hello! I was wondering what are some of the patter...
# ask-community
m
Hello! I was wondering what are some of the patterns that people use to set up flows with configurable number of workers for Dask? Given that you need to pass the executor to a flow it seems like you can't use parameters (as far as I can tell)? Right now we're using KubeCluster to spin up ephermal Dask clusters in K8s but our n_workers is hardcoded, but we'd like to configure it per environment or possibly based on task inputs in the future. Thanks!
k
Hi @Martin, I think you are exactly right that this can’t be done I think the best you can do is try to autoscale? Maybe you can do this with Resource Managers here ?
m
Thanks, will check that out! With regards to autoscaling, is that basically using min_workers/max_workers? And is the actual scaling driven by the number of mapped tasks, or something else?
k
Yes exactly and yes I believe so. Or basically processes that need to be started.
e
Co-worker of Martin here--I was independently investigating this and came up with a workable (if a little hacky) solution that involves subclassing DaskExecutor. It’s pretty short, so posting in case it’s useful to others.
Copy code
class MyExecutor(DaskExecutor):
    def submit(self, fn, *args, extra_context, **kwargs):
        if extra_context and "_meta" in extra_context.get("task_tags", []):
            # First wait on any upstream tasks, then run the task synchronously.
            upstream_states = self.wait(
                {e: state for e, state in kwargs.pop("upstream_states", {}).items()}
            )
            upstream_mapped_states = self.wait(
                {e: state for e, state in kwargs.pop("upstream_mapped_states", {}).items()}
            )
            # Inject executor into Prefect context, so we can, for example, call a scaling operation
            # on the Dask cluster.
            kwargs["context"] = dict(kwargs["context"], **{"_meta": {"executor": self}})
            return fn(
                *args,
                upstream_states=upstream_states,
                upstream_mapped_states=upstream_mapped_states,
                **kwargs,
            )

        return super().submit(fn, *args, extra_context=extra_context, **kwargs)
You can then interact with the executor in a Task, by adding the “_meta” tag to it. We use this to achieve a parameterized worker count:
Copy code
@task(tags=["_meta"])
def scale_cluster(n_workers):
    from prefect import context

    context._meta.get("executor").client.cluster.scale(n_workers)

def parameterize_dask_workers(**task_kwargs):
    n_workers = Parameter("_meta_n_workers", **task_kwargs)
    scale_cluster(n_workers)
k
@Marvin archive “Parameterized workers for executor”
k
This is nice work, @Evan!