Matt Alhonte
10/19/2021, 9:54 PMrun_config
based on params, and then kicked off from the Outer Flow
2. Spinning up a Dask Cluster from within the Flow and submitting tasks to it (and the Flow itself just runs in a small container) https://docs.prefect.io/core/idioms/resource-manager.html https://docs.prefect.io/api/latest/tasks/resources.html
3. Maybe start experimenting with Adaptive Scaling for Dask Clusters?Zanie
DaskExecutor
cluster_class
kwarg can be an arbitrary callable so that's the first place I'd think of to adjust the cluster.Kevin Kho
RunConfig
but more about the Executor
. You can use number 2 like thisMatt Alhonte
10/19/2021, 9:59 PMZanie
import prefect as pf
from prefect.executors import DaskExecutor
def gen_cluster():
from distributed import LocalCluster
if pf.context.parameters["x"] < 10:
return LocalCluster()
else:
return LocalCluster()
@pf.task
def my_task(x):
pass
with pf.Flow("example", executor=DaskExecutor(cluster_class=gen_cluster)) as flow:
x = pf.Parameter("x", 5)
my_task(x)
flow.run()
Matt Alhonte
10/19/2021, 10:03 PMZanie
Matt Alhonte
10/19/2021, 10:31 PMZanie
Matt Alhonte
10/19/2021, 10:33 PMKevin Kho
Matt Alhonte
10/20/2021, 6:32 PMKevin Kho
Matt Alhonte
10/20/2021, 6:37 PM