Martin
09/01/2021, 6:08 PMKevin Kho
Martin
09/01/2021, 6:31 PMKevin Kho
Evan
09/03/2021, 9:54 PMEvan
09/03/2021, 9:54 PMclass MyExecutor(DaskExecutor):
def submit(self, fn, *args, extra_context, **kwargs):
if extra_context and "_meta" in extra_context.get("task_tags", []):
# First wait on any upstream tasks, then run the task synchronously.
upstream_states = self.wait(
{e: state for e, state in kwargs.pop("upstream_states", {}).items()}
)
upstream_mapped_states = self.wait(
{e: state for e, state in kwargs.pop("upstream_mapped_states", {}).items()}
)
# Inject executor into Prefect context, so we can, for example, call a scaling operation
# on the Dask cluster.
kwargs["context"] = dict(kwargs["context"], **{"_meta": {"executor": self}})
return fn(
*args,
upstream_states=upstream_states,
upstream_mapped_states=upstream_mapped_states,
**kwargs,
)
return super().submit(fn, *args, extra_context=extra_context, **kwargs)
Evan
09/03/2021, 9:55 PMEvan
09/03/2021, 9:55 PM@task(tags=["_meta"])
def scale_cluster(n_workers):
from prefect import context
context._meta.get("executor").client.cluster.scale(n_workers)
def parameterize_dask_workers(**task_kwargs):
n_workers = Parameter("_meta_n_workers", **task_kwargs)
scale_cluster(n_workers)
Kevin Kho
Marvin
09/03/2021, 10:08 PMKevin Kho