Here's another noon question from me; can't get th...
# ask-community
h
Here's another noon question from me; can't get the Dask executor to pick up work from Prefect k8s Agent / cloud. Here's some output
Copy code
Using dask_endpoint=<tcp://dask-scheduler.flows.svc:8786>
Registering flow with label=['prod'] image=example/data-pipelines:cb40d2797195791a3cf195fa1906a1722222222
Registering... executor=DaskExecutor, run_config=KubernetesRun.
From this code:
Copy code
if args.dask:
    print(f"Using dask_endpoint={args.dask_endpoint}")
    flow.executor = DaskExecutor(address=args.dask_endpoint)

print(
    f"    executor={type(flow.executor).__name__}, run_config={type(flow.run_config).__name__}, result={type(flow.result).__name__}."
)

flow.register(
    project_name=args.project_name,
    build=args.build,
    idempotency_key=args.commit_ref,
    labels=args.labels,
    add_default_labels=False,
)
It just spawns
prefect-job
values...
a
1. @haf What do you mean when you say that it spawns
prefect-job
values? 2. You’re using an existing external cluster. This means that Prefect will spin up a Kubernetes job for the flow, and it will send the tasks to Dask workers. In Kubernetes, you should see only one pod for the flow, and the rest of the execution should happen in the external cluster. Can you confirm this behavior in your infrastructure? 3. Does your existing Dask cluster is deployed to the same Kubernetes cluster? If so, did you consider using a temporary Dask cluster per flow?
DaskExecutor
is capable of creating a temporary cluster using any of Dask's cluster-manager options . This is useful when you want each flow run to have its own Dask cluster, allowing for adaptive scaling per flow, and it’s easier to avoid flows competing for cluster resources. This doc shows more about pros and cons of each option: https://docs.prefect.io/orchestration/flow_config/executors.html#daskexecutor 4. If I understand the problem correctly, your
KubernetesAgent
doesn’t communicate properly with your Dask cluster. Did you set up permissions (Kubernetes
Role
) in your Kubernetes agent configuration to allow this communication? This blog post provides an example YAML file for that, if your Dask cluster is on Kubernetes.
👍 1
h
1. It spawns pods named
prefect-job-RANDOM
2. No I can't, this is my problem; everything executes in the
prefect-job
pod that starts the job. 3. Yes, it's deployed on the same infrastructure, on single-purpose nodes. For now, this is the only flow that uses Dask, so it's fine. 4. I was missing these permissions
Copy code
- apiGroups:
  - ""  # indicates the core API group
  resources:
  - "pods/log"
  verbs:
  - "get"
  - "list"
- apiGroups:
  - "" # indicates the core API group
  resources:
  - "services"
  verbs:
  - "get"
  - "list"
  - "watch"
  - "create"
  - "delete"
- apiGroups:
  - "policy"  # indicates the policy API group
  resources:
  - "poddisruptionbudgets"
  verbs:
  - "get"
  - "list"
  - "watch"
  - "create"
  - "delete"
a
@haf does it work now after you added those permissions?
h
Currently testing; need to deploy new version / build 🙂
👍 1
Hey, no it doesn't
It's saying
[2021-10-29 14:51:18+0000] DEBUG - prefect.CloudFlowRunner | Using executor type LocalExecutor
Maybe
executor
is not serialised as configuration to the flow instance when registering.
Since this is not present in the cloud UI either.
a
Correct, the executor is not serialized, this means that this won’t work:
Copy code
flow.executor = DaskExecutor(...)
flow.register(...)
Can you try this instead?:
Copy code
with Flow("yourflow", executor=DaskExecutor()) as flow:
h
Yes, that's one option, but it precludes me from running it locally
I'm currently trying using env vars on the job template
I could use an ad-hoc cluster if I can find how to set "tolerations" and cross-datacenter pod affinity and spread policies.
Env vars did it
But now:
Exception: "RuntimeError('Unable to find any timezone configuration')"
a
Do you need help with this? I would suggest just using this, as discussed:
Copy code
with Flow("yourflow", executor=DaskExecutor()) as flow:
you can still run things locally, under the hood this would spin up a local Dask cluster. Or you could comment it out for a one-time local execution?
h
Can I replace it using env vars in prod then?
Trying
a
I wouldn’t do it with environment variables. This can lead to issues when some environments have this environment variables and some don’t, and also issues related to registration time vs. run time. I would rather use one of those options: 1. Comment out this one line when running it locally. 2. Have two versions of the flow in different repository branches. Dev branch can have one without Dask, or with LocalDaskExecutor, production branch can have this:
Copy code
with Flow("yourflow", executor=DaskExecutor(production_dask_scheduler_address)) as flow:
h
Copy code
[2021-10-29 15:38:18+0000] DEBUG - prefect.CloudFlowRunner | Flow 'run_mmm': Handling state change from Scheduled to Running
[2021-10-29 15:38:19+0000] INFO - prefect.DaskExecutor | Creating a new Dask cluster with `distributed.deploy.local.LocalCluster`...
[2021-10-29 15:38:21+0000] INFO - prefect.DaskExecutor | The Dask dashboard is available at <http://127.0.0.1:8787/status>
/usr/local/lib/python3.8/site-packages/distributed/scheduler.py:5559: UserWarning: Scheduler already contains a plugin with name worker-status; overwriting.
  warnings.warn(
When doing
executor=DaskExecutor
but doesn't spawn any new pods
Seems to be running locally inside the pod
Albeit it does parallelise "well" in that it runs everything at the same time
(12 tasks in parallel = would need 72 CPUs)
This is with
executor=DaskExecutor(),
I ended up using
executor=DaskExecutor()
in the flow that needs it; on a single node this means that it does depth first search. NOT setting the env var to be Dask executor means ALL other flows run on the single node/pod like they should, but then the setting the env var with the Dask address makes the
executor=DaskExecutor()
flows automatically pick it up. Became very convenient actually.