https://prefect.io logo
j

Joël Luijmes

12/03/2020, 12:31 PM
Hey question! I’m runing Prefect in Kubernetes, and I began working on a flow which will fire up 10 Kubernetes jobs. However, currently it is spawning these jobs sequentially, while I want to make them start all immediately. In order to do so, I deployed dask to the cluster (with https://helm.dask.org), used the DaskExecutor and point it to the scheduler. However, the same is still happening 😞. Are there other configuration required in order for this to work? I tried running the flow locally with the DaskExeuctor, and then it does its magic in parallel 🙂 (Note: tested it with a different flow.) I’ll post the details in thread.
1
Schematic shows the flow as expected in the UI Slack fails but we can ignore that for now, probably did something wrong with the secret
Timeline shows the sequential exeuction.
d

Dimitris Stafylarakis

12/03/2020, 12:33 PM
hi @Joël Luijmes I'm working on a similar setup 🙂 did you specify the address for the DaskExecutor?
j

Joël Luijmes

12/03/2020, 12:33 PM
Copy code
~> kgs                                                                                    
NAME                          TYPE           CLUSTER-IP     EXTERNAL-IP     PORT(S)           AGE
dask-scheduler                ClusterIP      10.32.4.76     <none>          8786/TCP,80/TCP   43s
prefect-postgresql            ClusterIP      10.32.0.123    <none>          5432/TCP          20d
prefect-postgresql-headless   ClusterIP      None           <none>          5432/TCP          20d
prefect-server-apollo         LoadBalancer   10.32.15.162   34.91.229.28    4200:32755/TCP    20d
prefect-server-graphql        ClusterIP      10.32.6.217    <none>          4201/TCP          20d
prefect-server-hasura         ClusterIP      10.32.13.168   <none>          3000/TCP          20d
prefect-server-ui             LoadBalancer   10.32.7.203    34.91.165.136   8080:32471/TCP    20d
~> kgp                                                                                    
NAME                                      READY   STATUS      RESTARTS   AGE
dask-scheduler-9b4bdffc-vrq9g             1/1     Running     0          14m
dask-worker-6c9755b48-jg26c               1/1     Running     0          14m
dask-worker-6c9755b48-lxptc               1/1     Running     0          14m
dask-worker-6c9755b48-tk2bz               1/1     Running     0          14m
hello-7t8qs                               0/2     Completed   0          9d
prefect-agent-594d644fd-cpzxt             1/1     Running     0          52m
prefect-postgresql-0                      1/1     Running     0          20d
prefect-server-apollo-77db95d99-st9l4     1/1     Running     0          3h22m
prefect-server-graphql-846796fdcd-9xkf6   1/1     Running     0          3h22m
prefect-server-hasura-59b994c54-nd55h     1/1     Running     3          20d
prefect-server-towel-67d57669b9-nrszp     1/1     Running     0          20d
prefect-server-ui-78ff94b998-lfmj6        1/1     Running     0          3h22m
d

Dimitris Stafylarakis

12/03/2020, 12:33 PM
flow.executor = DaskExecutor(address="tcp://dask-scheduler:8786")
j

Joël Luijmes

12/03/2020, 12:33 PM
Copy code
if __name__ == "__main__":
    # Store the flow in the following GCR
    storage_gcr = Docker(
        registry_url="XX",
    )

    # Use LocalDaskExecutor to parallelize tasks within flow
    executor_dask = DaskExecutor(address="<tcp://dask-scheduler:8786>", )

    # Generate Prefect tasks based on the generated job specs
    tasks_syncs = [create_job_task(job_spec) for job_spec in generate_job_specs()]

    # Sends notification to Slack on completion
    task_slack_completed = SlackTask(
        name="slack",
        message=f"Completed syncing {len(tasks_syncs)} jobs"
    )

    with Flow("CloudSQL to BigQuery", storage=storage_gcr, executor=executor_dask) as flow:
        task_slack_completed.set_dependencies(upstream_tasks=tasks_syncs)
    
    flow.register(project_name="root")
@Dimitris Stafylarakis quick reply 😅 but yea, just posted the code showing the address
d

Dimitris Stafylarakis

12/03/2020, 12:34 PM
👍
j

Joël Luijmes

12/03/2020, 12:36 PM
Soo any other suggestions? I’m relatively new to dask, so no idea about the possible configurations or something. But could it be that dask only starts parallel working if there is more scheduled e.g. after the backlog grows or something? I found some other documtation mentioning
min_workesr
but that was with legacy environment thingy.
d

Dimitris Stafylarakis

12/03/2020, 12:36 PM
can you check the dask UI for clues?
no dask expert either, just happened to work on the same atm 😄
so FYI I've deployed the example in the docs
j

Joël Luijmes

12/03/2020, 12:41 PM
Hmm my dask UI stays empty, maybe its not actually using the DaskExecutor 😮
Any ideas how I can validate that?
Logging of dasks doesn’t show anything either (unsure if that is too be expected)
d

Dimitris Stafylarakis

12/03/2020, 12:47 PM
hmm, have you installed prefect on dask?
at least the workers would need it
j

Joël Luijmes

12/03/2020, 12:49 PM
No at least not that I’m aware 😅 My steps more or less 1. Deploy prefect to kubernetes (incl agent) 2. Deploy dask with helm chart 3. Create flow in docker container, set executor to Dask with the address 4. Press run and hope for the best
d

Dimitris Stafylarakis

12/03/2020, 12:49 PM
if you install prefect, you can hope for more 😄
helm chart allows this
you can add the env var EXTRA_PIP_PACKAGES, check the docs
j

Joël Luijmes

12/03/2020, 12:55 PM
Hmm that didn’t help either, I think the fault is somewhere in my code/setup or the Kubernetes agent denying to use dask 😮
Also set
Copy code
- name: PREFECT__ENGINE__EXECUTOR__DEFAULT_CLASS
  value: prefect.engine.executors.DaskExecutor
- name: PREFECT__ENGINE__EXECUTOR__DASK__ADDRESS
  value: <tcp://dask-scheduler:8786>
On the Kubernetes agent, but still no luck
d

Dimitris Stafylarakis

12/03/2020, 12:56 PM
checked the job pod's logs?
j

Joël Luijmes

12/03/2020, 12:58 PM
Yea the job works just fine, only prefect is creating the jobs serially instead using dask to parallel it
d

Dimitris Stafylarakis

12/03/2020, 1:09 PM
Copy code
from prefect import task, Flow
from prefect.engine.executors import DaskExecutor
from prefect.engine.result_handlers import LocalResultHandler
from prefect.environments.storage import Docker
from prefect.run_configs import KubernetesRun

import datetime
import random
from time import sleep


@task
def inc(x):
    sleep(random.random() / 10)
    return x + 1


@task
def dec(x):
    sleep(random.random() / 10)
    return x - 1


@task
def add(x, y):
    sleep(random.random() / 10)
    return x + y


@task(name="sum")
def list_sum(arr):
    return sum(arr)


with Flow("dask-example", result_handler=LocalResultHandler()) as flow:
    incs = inc.map(x=range(100))
    decs = dec.map(x=range(100))
    adds = add.map(x=incs, y=decs)
    total = list_sum(adds)

flow.executor = DaskExecutor(address="<tcp://dask-scheduler:8786>")
flow.storage = Docker()
flow.run_config = KubernetesRun(labels=["dev"])
flow.register(project_name="Hello World")
this worked, LocalResultHandler is probably not relevant, but see if this gets you any further
j

Joël Luijmes

12/03/2020, 1:17 PM
Kay good idea, lets just test the demo code
Hm that kinda works, do see lots of errors in the logs but the tasks are performed in parallel
d

Dimitris Stafylarakis

12/03/2020, 1:25 PM
👍
hope this sets you on the right track
j

Joël Luijmes

12/03/2020, 1:32 PM
Cool, got my original flow working. The result_handler didn’t do anything, however I noticed that the example specifies
run_config
after adding that, it works like a charm 👍 Thanks @Dimitris Stafylarakis for thinking with me! 🙂
d

Dimitris Stafylarakis

12/03/2020, 1:33 PM
👍
3 Views