Hello, we’re running Prefect 1 and recently upgrad...
# ask-community
o
Hello, we’re running Prefect 1 and recently upgraded from LocalDaskExecutor to DaskExecutors run on KubeClusters with Azure Kubernetes Service running the agent. We have a number of flows which are then run by a master flow. The master flow runs and can complete but doesn’t seem to be parallelising as expected - for the underlying tasks, the mapped children are running 1 by 1 even though the adapt_kwargs are set to minimum of 1 and maximum of 10 in each DaskExecutor. Is this a problem with running the flows from a master flow which also has a DaskExecutor defined?
k
This sounds like the executor did not attach. Can you show me how you attached it? It needs to be in the same file as the Flow. It won’t work if you import if you import it from another place and then attach it
o
Thanks for your response!
The exectuor is defined like that in the master flow as well as all of the other flows
k
Can you move
flow.executor()
out of the if block so that it’s attached? Also, you can try running a flow with DEBUG level logs and it will show the executor at the top of the logs
o
That’s fixed it, thanks! I ran the original code with the DEBUG log levels and it was using the LocalExecutor and is now using the DaskExecutor after moving the executor definition. Thank you very much for your help!
k
Nice! This is because Executor specifically is not registered like Storage and RunConfig. It is just pulled from the Flow file during execution
o
Makes sense!
Thanks again for your help yesterday! Unfortunately, I’ve run into the problem of the new dask pod not being able to pull an image from my Azure Container Registry. I have a Kubernetes secret shared with the cluster and the agent yaml points to it so the job pod spins up fine. Not sure what’s going wrong here
k
How do you authenticate for ACR? Is it
docker login
or is there a file with credentials? I hvaen’t done it in a while
o
Yeah it’s through docker login
k
I believe there’s two ways to authenticate and one didn’t work. We have a write up here (though not Kubernetes specific). Is it similar to what you did?
o
I didn’t do a service principle, just used the docker login username and password options available through the ACR. The secret works fine when spinning up the job pod with the ACR image, but not when spinning up the dask pod
k
Ah I see what you are saying. Can I see the code for DaskExecutor?
o
flow.executor = DaskExecutor(
cluster_class=lambda: KubeCluster(
make_pod_spec(image=prefect.context.image)
),
adapt_kwargs={"minimum": 2, "maximum": 3},
)
k
I think you need to add the credentials to
make_pod_spec
so they are able to pull the same image right?
How did you get image into
context
?
o
flow.run_config = KubernetesRun(
env=os.environ,
image="<http://sampo.azurecr.io/sampo-prefect:latest|sampo.azurecr.io/sampo-prefect:latest>",
labels=["Kubernetes"],
image_pull_policy="Always",
)
Does that get the image into context? It seems to find the image ok but not authenticate according to the k8 logs. Would make sense to add the credentials to make_pod_spec as well. Will try that now
k
I’m surprised if it does, but I’ll believe it. If you get the image ok but can’t authenticate, yes I think it’s about
make_pod_spec
o
Thanks for this! So, now the job pod and dask pod successfully starts (although I had to additionally install kubectl in my docker container), but I’m now getting a
ConnectionError: kubectl port forward failed
error
a
Hey @Ollie Sellers, have you managed to work through the port forward error? I have the same scenario as you. Thanks!
o
Unfortunately, I wasn’t able to resolve this issue and had to use the LocalDaskExecutor. I would love to get this working though if possible
a
@Ollie Sellers i think i've figured it out...the default service account that comes with the Helm chart doesn't have permission to port forward. So i've created three new kubernetes resources: service account, clusterrole and clusterrolebindings. Next i've added the "pods/portforward" resource inside the ClusterRole resource. Finally, you have to reference the new service account inside KubernetesRun:
Copy code
run_config=KubernetesRun(
                labels=["dev"], 
                job_template=job_template,
                service_account_name="prefect-sa")
The kubernetes resources:
Copy code
apiVersion: v1
kind: ServiceAccount
metadata:
  name: prefect-sa
  namespace: prefect-server

---
kind: ClusterRole
apiVersion: <http://rbac.authorization.k8s.io/v1|rbac.authorization.k8s.io/v1>
metadata:
  namespace: prefect-server
  name: allow-port-forward
# rules:
# - apiGroups: [""]
#   resources: ["pods", "pods/portforward"]
#   verbs: ["get", "list", "create"]
rules:
- apiGroups: ["batch", "extensions"]
  resources: ["jobs", "jobs/status"]
  verbs: ["*"]
- apiGroups: [""]
  resources: ["events", "pods", "pods/log", "services", "pods/portforward"]
  verbs: ["*"]
- apiGroups: ["policy"]
  resources: ["poddisruptionbudgets"]
  verbs: ["*"]

---

apiVersion: <http://rbac.authorization.k8s.io/v1|rbac.authorization.k8s.io/v1>
kind: ClusterRoleBinding
metadata:
  name: allow-port-forward
  namespace: prefect-server
subjects:
- kind: ServiceAccount
  name: prefect-sa
  namespace: prefect-server
roleRef:
  kind: ClusterRole
  name: allow-port-forward
  apiGroup: ""
Let me know if this works for you as well. Good luck!
o
Thank you! I will try this out and let you know
t
@Ollie Sellers did work out for you?
o
Hi @Tom Klein and @Alexandru Anghel, apologies for the delay in my reply, this fell down on the priority list so I made do with LocalDaskExecutor for a while. Interestingly, I’ve just revisited this and it seems to work without having to define the extra kubernetes resources. I’m not sure exactly why, but it must be a change to the Azure Kubernetes Service
t
The forward thing as far as I can tell is a side effect of things being misconfigured. In our case it came down to versions being mismatched between prefect, python and dask. Once all versions were aligned it worked (latest versions of dask stopped working with prefect but no one seems to care). However, even the term "worked" is a bit inacurrate since there were multiple issues with tasks failing sporadically during a long run. Prefect acknowledged this as a known issue but couldn't offer a solution other than saying maybe things would be better in 2.0
o
@Tom Klein Thanks for this further info. I’ve been ramping up the use of the DaskExecutor now with KubeCluster and have also been noticing pods failing sporadically during long/large runs. I think I will have to go back to LocalDask for now and aim to migrate to 2.0 soon. Have you also got a link to the known issue?
t
No at the moment sorry, but if you look at my past messages here (about dask) you will find the link in a response that one of the prefect people gave (it was either Kevin or Anna)
o
Ok, thanks