haf

    haf

    10 months ago
    Here's another noon question from me; can't get the Dask executor to pick up work from Prefect k8s Agent / cloud. Here's some output
    Using dask_endpoint=<tcp://dask-scheduler.flows.svc:8786>
    Registering flow with label=['prod'] image=example/data-pipelines:cb40d2797195791a3cf195fa1906a1722222222
    Registering... executor=DaskExecutor, run_config=KubernetesRun.
    From this code:
    if args.dask:
        print(f"Using dask_endpoint={args.dask_endpoint}")
        flow.executor = DaskExecutor(address=args.dask_endpoint)
    
    print(
        f"    executor={type(flow.executor).__name__}, run_config={type(flow.run_config).__name__}, result={type(flow.result).__name__}."
    )
    
    flow.register(
        project_name=args.project_name,
        build=args.build,
        idempotency_key=args.commit_ref,
        labels=args.labels,
        add_default_labels=False,
    )
    It just spawns
    prefect-job
    values...
    Anna Geller

    Anna Geller

    10 months ago
    1. @haf What do you mean when you say that it spawns
    prefect-job
    values? 2. You’re using an existing external cluster. This means that Prefect will spin up a Kubernetes job for the flow, and it will send the tasks to Dask workers. In Kubernetes, you should see only one pod for the flow, and the rest of the execution should happen in the external cluster. Can you confirm this behavior in your infrastructure? 3. Does your existing Dask cluster is deployed to the same Kubernetes cluster? If so, did you consider using a temporary Dask cluster per flow?
    DaskExecutor
    is capable of creating a temporary cluster using any of Dask's cluster-manager options . This is useful when you want each flow run to have its own Dask cluster, allowing for adaptive scaling per flow, and it’s easier to avoid flows competing for cluster resources. This doc shows more about pros and cons of each option: https://docs.prefect.io/orchestration/flow_config/executors.html#daskexecutor 4. If I understand the problem correctly, your
    KubernetesAgent
    doesn’t communicate properly with your Dask cluster. Did you set up permissions (Kubernetes
    Role
    ) in your Kubernetes agent configuration to allow this communication? This blog post provides an example YAML file for that, if your Dask cluster is on Kubernetes.
    haf

    haf

    10 months ago
    1. It spawns pods named
    prefect-job-RANDOM
    2. No I can't, this is my problem; everything executes in the
    prefect-job
    pod that starts the job. 3. Yes, it's deployed on the same infrastructure, on single-purpose nodes. For now, this is the only flow that uses Dask, so it's fine. 4. I was missing these permissions
    - apiGroups:
      - ""  # indicates the core API group
      resources:
      - "pods/log"
      verbs:
      - "get"
      - "list"
    - apiGroups:
      - "" # indicates the core API group
      resources:
      - "services"
      verbs:
      - "get"
      - "list"
      - "watch"
      - "create"
      - "delete"
    - apiGroups:
      - "policy"  # indicates the policy API group
      resources:
      - "poddisruptionbudgets"
      verbs:
      - "get"
      - "list"
      - "watch"
      - "create"
      - "delete"
    Anna Geller

    Anna Geller

    10 months ago
    @haf does it work now after you added those permissions?
    haf

    haf

    10 months ago
    Currently testing; need to deploy new version / build 🙂
    Hey, no it doesn't
    It's saying
    [2021-10-29 14:51:18+0000] DEBUG - prefect.CloudFlowRunner | Using executor type LocalExecutor
    Maybe
    executor
    is not serialised as configuration to the flow instance when registering.
    Since this is not present in the cloud UI either.
    Anna Geller

    Anna Geller

    10 months ago
    Correct, the executor is not serialized, this means that this won’t work:
    flow.executor = DaskExecutor(...)
    flow.register(...)
    Can you try this instead?:
    with Flow("yourflow", executor=DaskExecutor()) as flow:
    haf

    haf

    10 months ago
    Yes, that's one option, but it precludes me from running it locally
    I'm currently trying using env vars on the job template
    I could use an ad-hoc cluster if I can find how to set "tolerations" and cross-datacenter pod affinity and spread policies.
    Env vars did it
    But now:
    Exception: "RuntimeError('Unable to find any timezone configuration')"
    Anna Geller

    Anna Geller

    10 months ago
    Do you need help with this? I would suggest just using this, as discussed:
    with Flow("yourflow", executor=DaskExecutor()) as flow:
    you can still run things locally, under the hood this would spin up a local Dask cluster. Or you could comment it out for a one-time local execution?
    haf

    haf

    10 months ago
    Can I replace it using env vars in prod then?
    Trying
    Anna Geller

    Anna Geller

    10 months ago
    I wouldn’t do it with environment variables. This can lead to issues when some environments have this environment variables and some don’t, and also issues related to registration time vs. run time. I would rather use one of those options:1. Comment out this one line when running it locally. 2. Have two versions of the flow in different repository branches. Dev branch can have one without Dask, or with LocalDaskExecutor, production branch can have this:
    with Flow("yourflow", executor=DaskExecutor(production_dask_scheduler_address)) as flow:
    haf

    haf

    10 months ago
    [2021-10-29 15:38:18+0000] DEBUG - prefect.CloudFlowRunner | Flow 'run_mmm': Handling state change from Scheduled to Running
    [2021-10-29 15:38:19+0000] INFO - prefect.DaskExecutor | Creating a new Dask cluster with `distributed.deploy.local.LocalCluster`...
    [2021-10-29 15:38:21+0000] INFO - prefect.DaskExecutor | The Dask dashboard is available at <http://127.0.0.1:8787/status>
    /usr/local/lib/python3.8/site-packages/distributed/scheduler.py:5559: UserWarning: Scheduler already contains a plugin with name worker-status; overwriting.
      warnings.warn(
    When doing
    executor=DaskExecutor
    but doesn't spawn any new pods
    Seems to be running locally inside the pod
    Albeit it does parallelise "well" in that it runs everything at the same time
    (12 tasks in parallel = would need 72 CPUs)
    This is with
    executor=DaskExecutor(),
    I ended up using
    executor=DaskExecutor()
    in the flow that needs it; on a single node this means that it does depth first search. NOT setting the env var to be Dask executor means ALL other flows run on the single node/pod like they should, but then the setting the env var with the Dask address makes the
    executor=DaskExecutor()
    flows automatically pick it up. Became very convenient actually.