https://prefect.io logo
Title
l

Levi Leal

03/12/2021, 2:29 PM
Trying to add DaskExecutor to a flow I run on K8s. I'm currently using KubernetesRun and Docker for storage. When I add DaskExecutor I get and error while building the flow image. On the thread the code I'm using
import prefect
import time
from prefect import task, Flow, Parameter, unmapped
from prefect.storage import Docker
from prefect.executors import DaskExecutor
from prefect.run_configs import KubernetesRun
from dask_kubernetes import KubeCluster, make_pod_spec



run_config = KubernetesRun(
    cpu_request=1,
    image_pull_secrets=['ghcr']
)

storage = Docker(
    image_name='flow_dask',
    registry_url="<http://ghcr.io/lime-energy|ghcr.io/lime-energy>",
    python_dependencies=['dask-kubernetes']
)

executor = DaskExecutor(
    cluster_class=lambda: KubeCluster(make_pod_spec(image=prefect.context.image)),
    adapt_kwargs={"minimum": 5, "maximum": 10}
)


@task
def get_list():
    return range(1, 31)

@task
def print_me(n, prefix):
    logger = prefect.context.get('logger')
    time.sleep(2)
    <http://logger.info|logger.info>(f'{prefix} - {n}')



with Flow('Dask Flow', run_config=run_config, storage=storage, executor=executor) as flow:
    l = get_list()
    print_me.map(l, unmapped('Num'))


def register():
    flow.register(
        project_name='lime',
        labels=['s3-flow-storage']
    )

if __name__ == "__main__":
    register()
If I remove the executor from this flow it register properly
Is this the proper way of using dask on K8s?
I'd like to avoid having a static dask worker.
I realized if I change my flow to use DaskKubernetesEnvironment it works. My problem with this is that on prefect docs it says this environment is depracated.
with Flow('Dask Flow', run_config=run_config, storage=storage, executor=DaskExecutor()) as flow:
    l = get_list()
    print_me.map(l, unmapped('Num'))

flow.environment = DaskKubernetesEnvironment(
    min_workers=10, max_workers=15,
)