haf
10/29/2021, 8:47 AMUsing dask_endpoint=<tcp://dask-scheduler.flows.svc:8786>
Registering flow with label=['prod'] image=example/data-pipelines:cb40d2797195791a3cf195fa1906a1722222222
Registering... executor=DaskExecutor, run_config=KubernetesRun.
From this code:
if args.dask:
print(f"Using dask_endpoint={args.dask_endpoint}")
flow.executor = DaskExecutor(address=args.dask_endpoint)
print(
f" executor={type(flow.executor).__name__}, run_config={type(flow.run_config).__name__}, result={type(flow.result).__name__}."
)
flow.register(
project_name=args.project_name,
build=args.build,
idempotency_key=args.commit_ref,
labels=args.labels,
add_default_labels=False,
)
It just spawns prefect-job
values...Anna Geller
prefect-job
values?
2. You’re using an existing external cluster. This means that Prefect will spin up a Kubernetes job for the flow, and it will send the tasks to Dask workers. In Kubernetes, you should see only one pod for the flow, and the rest of the execution should happen in the external cluster. Can you confirm this behavior in your infrastructure?
3. Does your existing Dask cluster is deployed to the same Kubernetes cluster? If so, did you consider using a temporary Dask cluster per flow? DaskExecutor
is capable of creating a temporary cluster using any of Dask's cluster-manager options . This is useful when you want each flow run to have its own Dask cluster, allowing for adaptive scaling per flow, and it’s easier to avoid flows competing for cluster resources. This doc shows more about pros and cons of each option: https://docs.prefect.io/orchestration/flow_config/executors.html#daskexecutor
4. If I understand the problem correctly, your KubernetesAgent
doesn’t communicate properly with your Dask cluster. Did you set up permissions (Kubernetes Role
) in your Kubernetes agent configuration to allow this communication? This blog post provides an example YAML file for that, if your Dask cluster is on Kubernetes.haf
10/29/2021, 1:25 PMprefect-job-RANDOM
2. No I can't, this is my problem; everything executes in the prefect-job
pod that starts the job.
3. Yes, it's deployed on the same infrastructure, on single-purpose nodes. For now, this is the only flow that uses Dask, so it's fine.
4. I was missing these permissions
- apiGroups:
- "" # indicates the core API group
resources:
- "pods/log"
verbs:
- "get"
- "list"
- apiGroups:
- "" # indicates the core API group
resources:
- "services"
verbs:
- "get"
- "list"
- "watch"
- "create"
- "delete"
- apiGroups:
- "policy" # indicates the policy API group
resources:
- "poddisruptionbudgets"
verbs:
- "get"
- "list"
- "watch"
- "create"
- "delete"
Anna Geller
haf
10/29/2021, 1:32 PMhaf
10/29/2021, 3:02 PMhaf
10/29/2021, 3:03 PM[2021-10-29 14:51:18+0000] DEBUG - prefect.CloudFlowRunner | Using executor type LocalExecutor
haf
10/29/2021, 3:09 PMexecutor
is not serialised as configuration to the flow instance when registering.haf
10/29/2021, 3:09 PMAnna Geller
flow.executor = DaskExecutor(...)
flow.register(...)
Can you try this instead?:
with Flow("yourflow", executor=DaskExecutor()) as flow:
haf
10/29/2021, 3:09 PMhaf
10/29/2021, 3:10 PMhaf
10/29/2021, 3:15 PMhaf
10/29/2021, 3:18 PMhaf
10/29/2021, 3:18 PMException: "RuntimeError('Unable to find any timezone configuration')"
haf
10/29/2021, 3:24 PMAnna Geller
with Flow("yourflow", executor=DaskExecutor()) as flow:
you can still run things locally, under the hood this would spin up a local Dask cluster.
Or you could comment it out for a one-time local execution?haf
10/29/2021, 3:32 PMhaf
10/29/2021, 3:35 PMAnna Geller
with Flow("yourflow", executor=DaskExecutor(production_dask_scheduler_address)) as flow:
haf
10/29/2021, 3:40 PM[2021-10-29 15:38:18+0000] DEBUG - prefect.CloudFlowRunner | Flow 'run_mmm': Handling state change from Scheduled to Running
[2021-10-29 15:38:19+0000] INFO - prefect.DaskExecutor | Creating a new Dask cluster with `distributed.deploy.local.LocalCluster`...
[2021-10-29 15:38:21+0000] INFO - prefect.DaskExecutor | The Dask dashboard is available at <http://127.0.0.1:8787/status>
/usr/local/lib/python3.8/site-packages/distributed/scheduler.py:5559: UserWarning: Scheduler already contains a plugin with name worker-status; overwriting.
warnings.warn(
haf
10/29/2021, 3:41 PMexecutor=DaskExecutor
but doesn't spawn any new podshaf
10/29/2021, 3:41 PMhaf
10/29/2021, 3:41 PMhaf
10/29/2021, 3:42 PMhaf
10/29/2021, 3:42 PMexecutor=DaskExecutor(),
haf
10/30/2021, 4:06 PMexecutor=DaskExecutor()
in the flow that needs it; on a single node this means that it does depth first search. NOT setting the env var to be Dask executor means ALL other flows run on the single node/pod like they should, but then the setting the env var with the Dask address makes the executor=DaskExecutor()
flows automatically pick it up. Became very convenient actually.