Ruslan
03/01/2022, 12:35 PMAnna Geller
flow.executor = LocalDaskExecutor(scheduler="threads", num_workers=3)
for more about that, check this topic https://discourse.prefect.io/t/how-can-i-parallelize-execution-across-8-cpu-cores/103Ruslan
03/01/2022, 12:52 PMAnna Geller
KubernetesRun
?
2. How did you configure your Kubernetes agent?
3. Do you use Prefect Cloud or Server?
4. Do you use mapping?
5. Can you share your flow or a simplified version of it?num_workers
is not about worker nodes (VMs) but this way you can influence how many processes or threads to use. From the docs:
"By default the number of threads or processes used is equal to the number of cores available. This can be set explicitly by passing in num_workers
."Ruslan
03/01/2022, 1:25 PMdef register():
flow = prefect_flow()
flow.storage = Git(git_token_secret_name="github",
repo="sravni/etl", repo_host="<http://github.com|github.com>",
flow_path="flow/News.py")
flow.run_config = KubernetesRun(
image="<http://sravni.azurecr.io/dwh-prefect:latest|sravni.azurecr.io/dwh-prefect:latest>",
image_pull_policy="Always",
job_template_path="k8s/job_template.yaml",
env={"path_tmp": "/tmp/replicator/"}
)
flow.executor = LocalDaskExecutor(scheduler="threads", num_workers=3)
flow.register(project_name="dwh", labels=['k8s'])
Same flow worked on local agent, now I’m moving on kubernetes and there are only 1 thread working
1. didn’t mention cpu limits
2. in agent 1 cpu
3. cloud
4. what is mapping?Anna Geller
with Flow("somename", executor = LocalDaskExecutor(scheduler="threads", num_workers=3)) as flow:
the reason for this is that Prefect doesn't store the executor info in the backend, it pull it from storage and your flow seems to be stored in a separate file?Ruslan
03/01/2022, 2:53 PMwith Flow("somename", executor = LocalDaskExecutor(scheduler="threads", num_workers=3)) as flow:
thats fix it! thank you Anna!