Levi Leal
03/12/2021, 2:29 PMimport prefect
import time
from prefect import task, Flow, Parameter, unmapped
from prefect.storage import Docker
from prefect.executors import DaskExecutor
from prefect.run_configs import KubernetesRun
from dask_kubernetes import KubeCluster, make_pod_spec
run_config = KubernetesRun(
cpu_request=1,
image_pull_secrets=['ghcr']
)
storage = Docker(
image_name='flow_dask',
registry_url="<http://ghcr.io/lime-energy|ghcr.io/lime-energy>",
python_dependencies=['dask-kubernetes']
)
executor = DaskExecutor(
cluster_class=lambda: KubeCluster(make_pod_spec(image=prefect.context.image)),
adapt_kwargs={"minimum": 5, "maximum": 10}
)
@task
def get_list():
return range(1, 31)
@task
def print_me(n, prefix):
logger = prefect.context.get('logger')
time.sleep(2)
<http://logger.info|logger.info>(f'{prefix} - {n}')
with Flow('Dask Flow', run_config=run_config, storage=storage, executor=executor) as flow:
l = get_list()
print_me.map(l, unmapped('Num'))
def register():
flow.register(
project_name='lime',
labels=['s3-flow-storage']
)
if __name__ == "__main__":
register()
with Flow('Dask Flow', run_config=run_config, storage=storage, executor=DaskExecutor()) as flow:
l = get_list()
print_me.map(l, unmapped('Num'))
flow.environment = DaskKubernetesEnvironment(
min_workers=10, max_workers=15,
)