ciaran
05/05/2021, 1:36 PMKubernetesRun
amd DaskExecutor (KubeCluster)
on AKS.
I submit my flow and it runs, I can see the Dask Scheduler pod stand up, but then nothing happens.
For example, the flow image provided shows a simple flow that has been running for 17(now 20)+ hours 😮
You can find the flow definition here https://github.com/pangeo-forge/pangeo-forge-azure-bakery/blob/add-k8s-cluster/flow_test/manual_flow.py
And you can find the agent conf here https://github.com/pangeo-forge/pangeo-forge-azure-bakery/blob/add-k8s-cluster/prefect_agent_conf.yamlciaran
05/05/2021, 1:36 PMRobin
05/05/2021, 1:45 PMciaran
05/05/2021, 1:46 PMciaran
05/05/2021, 1:46 PMciaran
05/05/2021, 1:46 PMciaran
05/05/2021, 1:47 PMRobin
05/05/2021, 1:48 PMdef make_cluster():
return KubeCluster(
make_pod_spec(
image=os.environ["AZURE_BAKERY_IMAGE"],
labels={
"flow": flow_name
},
memory_limit=None,
memory_request=None
)
)
ciaran
05/05/2021, 1:49 PMRobin
05/05/2021, 1:50 PMciaran
05/05/2021, 1:55 PMciaran
05/05/2021, 1:57 PMRobin
05/05/2021, 1:58 PMciaran
05/05/2021, 1:59 PMMariia Kerimova
05/05/2021, 2:13 PMciaran
05/05/2021, 2:15 PMciaran
05/05/2021, 2:37 PMRobin
05/05/2021, 3:09 PMdask
with e.g. da.arange
or something similar by any chance?ciaran
05/05/2021, 3:11 PMlogger = prefect.context.get("logger")
<http://logger.info|logger.info>("Hello, Cloud")
return "hello result"
Mariia Kerimova
05/05/2021, 3:32 PMciaran
05/05/2021, 3:36 PMciaran
05/05/2021, 4:33 PMexecutor=DaskExecutor(
cluster_class="dask_kubernetes.KubeCluster",
cluster_kwargs={
"pod_template": make_pod_spec(
image=os.environ["AZURE_BAKERY_IMAGE"],
labels={
"flow": flow_name
},
memory_limit=None,
memory_request=None,
)
},
adapt_kwargs={"maximum": 10}
)
ciaran
05/05/2021, 4:35 PMstr
name of KubeCluster
, pod_template
in the cluster_kwargs
(this was really not an obvious one), finally the sprinkle of magic at the end was adapt_kwargs
. So maybe having not listed n_workers
meant that dask just never started to schedule work. Which is a bit funky. It's not a required parameter so you'd assume it'd have some kind of OK defaultciaran
05/05/2021, 5:00 PMMariia Kerimova
05/05/2021, 5:33 PMKevin Kho
Jay Shah
05/09/2021, 4:23 AMimport prefect
from prefect import task, Flow
from prefect.storage import GitLab
from prefect.run_configs import KubernetesRun
@task
def say_hello():
logger = prefect.context.get("logger")
<http://logger.info|logger.info>("Hello, Cloud!")
print('Hello, Cloud!')
with Flow("hello-world") as flow:
say_hello()
flow.storage = GitLab(
repo="repo/location",
path="path_to_workflow.py",
access_token_secret="secret"
)
flow.run_config = KubernetesRun(labels=['some-label'])