Hey everyone! I am new to Prefect and am loving i...
# prefect-server
p
Hey everyone! I am new to Prefect and am loving it so far! I have deployed Prefect Server + Agent alongside a persistent Dask cluster in K8s, using Git storage for the following flow:
Copy code
from prefect import task, Flow
import random
from time import sleep
from prefect.run_configs import KubernetesRun
from prefect.executors import DaskExecutor
from prefect.storage import GitLab

@task
def inc(x):
    sleep(random.random() / 10)
    return x + 1

@task
def dec(x):
    sleep(random.random() / 10)
    return x - 1

@task
def add(x, y):
    sleep(random.random() / 10)
    return x + y

@task(name="sum")
def list_sum(arr):
    return sum(arr)


with Flow("dask-test") as flow:
    incs = inc.map(x=range(100))
    decs = dec.map(x=range(100))
    adds = add.map(x=incs, y=decs)
    total = list_sum(adds)

flow.storage = GitLab(<git kwargs>)
flow.run_config = KubernetesRun(image="prefecthq/prefect:latest-python3.8")
flow.executor = DaskExecutor("<tcp://dask-scheduler:8786>")

flow.register(project_name="test")
When running this flow with a
LocalDaskExecutor
, it works fine. In the Kubernetes cluster, however, the job is never marked as finished on the Prefect server. I can see the job execute to completion on the Dask scheduler UI, but the flow run goes indefinitely on Prefect. Any help would be greatly appreciated!
k
Hey @patrickd, just a couple of questions. Does your memory look healthy in the Dask UI? What do your logs end with? Have you tried running with debug level logs?
p
@Kevin Kho Memory looks fine in Dask UI - what logs are you referring to? The K8s job completes successfully, no errors in the logs for the job pod. Thanks for the quick response!
k
Prefect logs for the Flow
p
Copy code
5 August 2021,11:19:50 	agent	INFO	Submitted for execution: Job prefect-job-35ef6afb
5 August 2021,11:20:12 	prefect.CloudFlowRunner	INFO	Beginning Flow run for 'dask-test'
5 August 2021,11:20:12 	prefect.DaskExecutor	INFO	Connecting to an existing Dask cluster at <tcp://dask-scheduler:8786>
5 August 2021,11:20:13 	prefect.CloudFlowRunner	INFO	Flow run RUNNING: terminal tasks are incomplete.
It hangs at the end here. Strangely, I can see the flow execute on the dask cluster in between "submitted for execution" and "beginning flow"
k
I think that’s expected because the submitted is the end log and the beginning is the flow runner log. So I am thinking running the flow with DEBUG level logs might give us more info here? We’ve seen this in the past, just trying to get more info. Normally it was memory stuff or maybe dask hanging but it seems you checked that already. We’re working with the Coiled team to improve this. If you haven’t done it before, pass
PREFECT___LOGGING___LEVEL: "DEBUG"
to your RunConfig like `RunConfig(env={
PREFECT___LOGGING___LEVEL: "DEBUG"
}). Or you can do this through the UI can you click Flow. You can go to Advanced Config and add the env variable.
p
Copy code
5 August 2021,11:32:05 	agent	INFO	Submitted for execution: Job prefect-job-4cad06a8
5 August 2021,11:32:25 	prefect.CloudFlowRunner	INFO	Beginning Flow run for 'dask-test'
5 August 2021,11:32:25 	prefect.CloudFlowRunner	DEBUG	Using executor type DaskExecutor
5 August 2021,11:32:25 	prefect.CloudFlowRunner	DEBUG	Flow 'dask-test': Handling state change from Scheduled to Running
5 August 2021,11:32:25 	prefect.DaskExecutor	INFO	Connecting to an existing Dask cluster at <tcp://dask-scheduler:8786>
5 August 2021,11:32:26 	prefect.CloudFlowRunner	DEBUG	Checking flow run state...
5 August 2021,11:32:26 	prefect.CloudFlowRunner	INFO	Flow run RUNNING: terminal tasks are incomplete.
Here are the logs at debug level. To double check, i took a look at the logs for the Dask workers and found this:
Copy code
prefect.exceptions.AuthorizationError: Malformed response received from Cloud - please ensure that you are authenticated. See `prefect auth login --help`.
[2021-08-05 16:35:13+0000] INFO - prefect.CloudTaskRunner | Task 'inc': Finished task run for task with final state: 'Pending'
I'm using server backend - is Prefect Cloud required to run a persistent Dask cluster in kubernetes?
k
I don’t think so. Will ask the team about this.
p
Ok, thanks for your help!
@Kevin Kho Figured it out - Config issue with the Dask helm cluster, worker pods were configured to cloud instead of server. Silly mistake on my part! If anyone comes across this issue, deploying this yaml file:
Copy code
worker:
  env:
  - name: EXTRA_PIP_PACKAGES
    value: prefect --upgrade
  - name: PREFECT__BACKEND
    value: server
  - name: PREFECT__SERVER__HOST
    value: <http://prefect-apollo>
with your Dask helm deployment will fix it!
Thanks for your help
k
Thanks for the update. How did you diagnose that? But the logs were showing in Server right? Kinda surprised you still got logs…or is that because the scheduler was right?
p
I shelled into a dask worker pod and ran
Copy code
prefect diagnostics
which gave
Copy code
{
  "config_overrides": {},
  "env_vars": [],
  "system_information": {
    "platform": "Linux-5.4.129-63.229.amzn2.x86_64-x86_64-with-glibc2.10",
    "prefect_backend": "cloud",
    "prefect_version": "0.15.3",
    "python_version": "3.8.0"
  }
}
so i added the PREFECT__BACKEND variable to my custom dask-helm.yaml. Once I did that I redeployed the helm cluster with
Copy code
helm install dask -f kubernetes/dask-helm.yaml dask/dask
and tried rerunning the job. It failed again, but from the worker logs it was looking for the graphql API at localhost:4200. I added the host env var to dask-helm.yaml, redeployed, and it worked. Final prefect diagnostics gave
Copy code
{
  "config_overrides": {},
  "env_vars": [
    "PREFECT__SERVER__HOST",
    "PREFECT__BACKEND"
  ],
  "system_information": {
    "platform": "Linux-5.4.129-63.229.amzn2.x86_64-x86_64-with-glibc2.10",
    "prefect_backend": "server",
    "prefect_version": "0.15.3",
    "python_version": "3.8.0"
  }
}
Pretty sure the reason I got logs was that the Prefect job pod was able to communicate with the Dask cluster as usual. when the jobs needed to be updated, the Dask workers attempted to contact Prefect Cloud instead of calling back to the server, so the job was indefinitely running
k
Ah I see. That makes sense. Thanks for all the info!
👍 1
s
@patrickd what was the host env you added? I think I’m running into this same issue.
k
He was referring to this right?
Copy code
worker:
  env:
  - name: EXTRA_PIP_PACKAGES
    value: prefect --upgrade
  - name: PREFECT__BACKEND
    value: server
  - name: PREFECT__SERVER__HOST
    value: <http://prefect-apollo>
s
Yep, I added that and confirmed that the diagnostics changed, but still have issues...I can connect when running directly from python, but can't if running the flow from locally hosted sever. Alas, it is late and I will make a new thread tomorrow.
👍 1
p
@Sam Werbalowsky what does your environment look like? Are you running in k8s?
s
I was running using local agent and local prefect server UI using docker compose
Don’t have K8s deployment set up yet.
p
so local prefect server running in docker with a Dask cluster also running in docker?
s
I either end up with a timeout connecting to the dask cluster or it just sits in pending doing nothing. I will post more details later.
ah dask is deployed on K8s
I was able to verify backend was server by exec-ing in to the worker pod.
p
are you using kubectl port-forward on the dask cluster?
sounds like a networking issue to me
s
yes, I used port forward and attempted with both localhost (which doesn’t seem like it would work if being run on the k8s worker) or the amazon EC2 ELB. I will check with my devops guy if it’s networking. I’m not super knowledgeable about K8
although why would I be able to run it from the command line running
python myflow.py
but not from the prefect server UI?
p
it works from
python myflow.py
?
s
correct
but not if I run
prefect server start
, register the flow, then try to run it.
p
when you get a chance, dump me the logs from a dask worker pod after running the flow from the web ui and the output from
prefect diagnostics
for the pod and your local machine
👀 1
s
This is the
prefect diagnostics
from the worker -
Copy code
{
  "config_overrides": {},
  "env_vars": [
    "PREFECT__SERVER__HOST",
    "PREFECT__BACKEND"
  ],
  "system_information": {
    "platform": "Linux-4.9.0-13-amd64-x86_64-with-glibc2.10",
    "prefect_backend": "server",
    "prefect_version": "0.15.3",
    "python_version": "3.8.3"
  }
}
and from my local machine
Copy code
{
  "config_overrides": {},
  "env_vars": [],
  "system_information": {
    "platform": "macOS-10.14.6-x86_64-i386-64bit",
    "prefect_backend": "server",
    "prefect_version": "0.15.3",
    "python_version": "3.8.10"
  }
}
there aren’t any logs being generated in the worker except the basic info stuff
Copy code
distributed.nanny - INFO -         Start Nanny at: '<tls://100.96.4.126:35683>'
distributed.worker - INFO -       Start worker at:   <tls://100.96.4.126:44803>
distributed.worker - INFO -          Listening to:   <tls://100.96.4.126:44803>
distributed.worker - INFO -          dashboard at:          100.96.4.126:8787
distributed.worker - INFO - Waiting to connect to: <tls://dask-783b0af74c35426e980a226efc51dd26.data-common:8786>
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -               Threads:                          1
distributed.worker - INFO -                Memory:                    2.15 GB
distributed.worker - INFO -       Local Directory: /home/dask/dask-worker-space/dask-worker-space/worker-jdtltre4
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -         Registered to: <tls://dask-783b0af74c35426e980a226efc51dd26.data-common:8786>
distributed.worker - INFO - -------------------------------------------------
distributed.core - INFO - Starting established connection
so yeah, I’d agree it looks like a networking thing
I am using the
dask-gateway-server
helm chart and using a custom image that installs prefect to the workers
prefect UI logs
Copy code
10 August 2021,03:18:06 	agent	INFO	Submitted for execution: PID: 11033
10 August 2021,03:18:10 	prefect.CloudFlowRunner	INFO	Beginning Flow run for 'dask-example-local-dask'
10 August 2021,03:18:10 	prefect.CloudFlowRunner	DEBUG	Using executor type DaskExecutor
10 August 2021,03:18:10 	prefect.CloudFlowRunner	DEBUG	Flow 'dask-example-local-dask': Handling state change from Scheduled to Running
10 August 2021,03:18:10 	prefect.DaskExecutor	INFO	Connecting to an existing Dask cluster at gateway://<MY_GATEWAY>
10 August 2021,03:18:12 	prefect.CloudFlowRunner	DEBUG	Checking flow run state...
10 August 2021,03:18:12 	prefect.CloudFlowRunner	INFO	Flow run RUNNING: terminal tasks are incomplete.
ah ok - seeing tons of connection errors now in the pod that I made a new cluster and attempted to connect to that rather than make a new one - tons of stuff like this
Copy code
Traceback (most recent call last):
  File "/opt/conda/lib/python3.8/site-packages/requests/adapters.py", line 439, in send
    resp = conn.urlopen(
  File "/opt/conda/lib/python3.8/site-packages/urllib3/connectionpool.py", line 719, in urlopen
    retries = retries.increment(
  File "/opt/conda/lib/python3.8/site-packages/urllib3/util/retry.py", line 436, in increment
    raise MaxRetryError(_pool, url, error or ResponseError(cause))
urllib3.exceptions.MaxRetryError: HTTPConnectionPool(host='prefect-apollo', port=4200): Max retries exceeded with url: /graphql (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7f66118de190>: Failed to establish a new connection: [Errno -2] Name or service not known'))

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/conda/lib/python3.8/site-packages/prefect/engine/cloud/task_runner.py", line 154, in initialize_run
    task_run_info = self.client.get_task_run_info(
  File "/opt/conda/lib/python3.8/site-packages/prefect/client/client.py", line 1721, in get_task_run_info
    result = self.graphql(mutation)  # type: Any
  File "/opt/conda/lib/python3.8/site-packages/prefect/client/client.py", line 543, in graphql
    result = <http://self.post|self.post>(
  File "/opt/conda/lib/python3.8/site-packages/prefect/client/client.py", line 446, in post
    response = self._request(
  File "/opt/conda/lib/python3.8/site-packages/prefect/client/client.py", line 710, in _request
    response = self._send_request(
  File "/opt/conda/lib/python3.8/site-packages/prefect/client/client.py", line 596, in _send_request
    response = <http://session.post|session.post>(
  File "/opt/conda/lib/python3.8/site-packages/requests/sessions.py", line 578, in post
    return self.request('POST', url, data=data, json=json, **kwargs)
  File "/opt/conda/lib/python3.8/site-packages/requests/sessions.py", line 530, in request
    resp = self.send(prep, **send_kwargs)
  File "/opt/conda/lib/python3.8/site-packages/requests/sessions.py", line 643, in send
    r = adapter.send(request, **kwargs)
  File "/opt/conda/lib/python3.8/site-packages/requests/adapters.py", line 516, in send
    raise ConnectionError(e, request=request)
requests.exceptions.ConnectionError: HTTPConnectionPool(host='prefect-apollo', port=4200): Max retries exceeded with url: /graphql (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7f66118de190>: Failed to establish a new connection: [Errno -2] Name or service not known'))
p
yeah, that's kinda what i thought the issue would be. the pod is trying to dial back to http://prefect-apollo:4200/graphql in the kube cluster - your dask cluster might not be able to hit your local machine from inside the kube cluster. You could try switching that to http://localhost in the yaml file, but not sure if that'll work
s
yeah I feel like it’s not going to be able to connect back. So it seems like it’s not really possible without some workarounds to use a prefect server on my laptop and then run a flow in a remote dask without additional configuration. But if I can run and test via python, that should be fine since there’s no real need to orchestrate locally. I do think it’s weird it works through command line, but not server, but that’s gonna be a problem for another day 😄. Appreciate you looking through those logs and advising.
n
Hi @patrickd, I am facing a similar issue. This thread helped a lot in narrowing down the issue. Do you think setting these environment variables in my
Dockerfile
would work?
PREFECT__BACKEND="server"
PREFECT__SERVER__HOST
PREFECT__SERVER__PORT
PREFECT__ENGINE__EXECUTOR__DEFAULT_CLASS="prefect.executors.DaskExecutor"
PREFECT__ENGINE__EXECUTOR__DASK__ADDRESS
p
@Nivi Mukka What does your architecture look like? Running both dask and prefect in k8s?
n
Yes, both Dask and Prefect are setup on a GKE cluster.
p
yep! just make sure that backend is set to server in your Dask worker pods and set the
PREFECT__SERVER__HOST
to
Copy code
http://<prefect apollo service name>
and you should be set. are you deploying with helm?
👍 1
n
Okay, will give that a shot. No, not using helm. Just a Dockerfile with .py scripts for deployment.