https://prefect.io logo
#prefect-server
Title
# prefect-server
r

Ruslan

03/01/2022, 12:35 PM
Hi! how to parallel task working in kubernetes agent? I have got flow and 10 tasks inside and I want them to work in 3 threads. LocalDaskExecutor/DaskExecutor not working
a

Anna Geller

03/01/2022, 12:49 PM
Check out this Discourse topic - it links to further topics and resources about mapping, various executors, various types of Dask clusters and more https://discourse.prefect.io/t/how-can-i-configure-my-flow-to-run-with-dask/45 But if you have any specific questions after that, let us know
specifically to configure 3 threads, you could use:
Copy code
flow.executor = LocalDaskExecutor(scheduler="threads", num_workers=3)
for more about that, check this topic https://discourse.prefect.io/t/how-can-i-parallelize-execution-across-8-cpu-cores/103
r

Ruslan

03/01/2022, 12:52 PM
yes it worked fine in local agent but it doesnt work in kubernetes agent. anyway num_workers only 1
a

Anna Geller

03/01/2022, 12:56 PM
Can you first check out the resources I shared? 🙂 Alternatively can you provide more information? Just saying "it doesn't work" doesn't give much information to identify what's wrong in your setup 😄 1. How many CPU resources did you allocate to your Kubernetes job template or
KubernetesRun
? 2. How did you configure your Kubernetes agent? 3. Do you use Prefect Cloud or Server? 4. Do you use mapping? 5. Can you share your flow or a simplified version of it?
num_workers
is not about worker nodes (VMs) but this way you can influence how many processes or threads to use. From the docs: "By default the number of threads or processes used is equal to the number of cores available. This can be set explicitly by passing in
num_workers
."
r

Ruslan

03/01/2022, 1:25 PM
Copy code
def register():
    flow = prefect_flow()

    flow.storage = Git(git_token_secret_name="github",
                       repo="sravni/etl", repo_host="<http://github.com|github.com>",
                       flow_path="flow/News.py")

    flow.run_config = KubernetesRun(
        image="<http://sravni.azurecr.io/dwh-prefect:latest|sravni.azurecr.io/dwh-prefect:latest>",
        image_pull_policy="Always",
        job_template_path="k8s/job_template.yaml",
        env={"path_tmp": "/tmp/replicator/"}
    )

    flow.executor = LocalDaskExecutor(scheduler="threads", num_workers=3)
    flow.register(project_name="dwh", labels=['k8s'])
Same flow worked on local agent, now I’m moving on kubernetes and there are only 1 thread working 1. didn’t mention cpu limits 2. in agent 1 cpu 3. cloud 4. what is mapping?
I checked limits inside working container - no cpu limits
a

Anna Geller

03/01/2022, 1:28 PM
try attaching the executor directly to your flow object (wherever you define your flow) e.g.
Copy code
with Flow("somename", executor = LocalDaskExecutor(scheduler="threads", num_workers=3)) as flow:
the reason for this is that Prefect doesn't store the executor info in the backend, it pull it from storage and your flow seems to be stored in a separate file?
r

Ruslan

03/01/2022, 2:53 PM
Copy code
with Flow("somename", executor = LocalDaskExecutor(scheduler="threads", num_workers=3)) as flow:
thats fix it! thank you Anna!
👍 1
2 Views