https://prefect.io logo
Title
p

patrickd

08/05/2021, 2:57 PM
Hey everyone! I am new to Prefect and am loving it so far! I have deployed Prefect Server + Agent alongside a persistent Dask cluster in K8s, using Git storage for the following flow:
from prefect import task, Flow
import random
from time import sleep
from prefect.run_configs import KubernetesRun
from prefect.executors import DaskExecutor
from prefect.storage import GitLab

@task
def inc(x):
    sleep(random.random() / 10)
    return x + 1

@task
def dec(x):
    sleep(random.random() / 10)
    return x - 1

@task
def add(x, y):
    sleep(random.random() / 10)
    return x + y

@task(name="sum")
def list_sum(arr):
    return sum(arr)


with Flow("dask-test") as flow:
    incs = inc.map(x=range(100))
    decs = dec.map(x=range(100))
    adds = add.map(x=incs, y=decs)
    total = list_sum(adds)

flow.storage = GitLab(<git kwargs>)
flow.run_config = KubernetesRun(image="prefecthq/prefect:latest-python3.8")
flow.executor = DaskExecutor("<tcp://dask-scheduler:8786>")

flow.register(project_name="test")
When running this flow with a
LocalDaskExecutor
, it works fine. In the Kubernetes cluster, however, the job is never marked as finished on the Prefect server. I can see the job execute to completion on the Dask scheduler UI, but the flow run goes indefinitely on Prefect. Any help would be greatly appreciated!
k

Kevin Kho

08/05/2021, 3:22 PM
Hey @patrickd, just a couple of questions. Does your memory look healthy in the Dask UI? What do your logs end with? Have you tried running with debug level logs?
p

patrickd

08/05/2021, 3:48 PM
@Kevin Kho Memory looks fine in Dask UI - what logs are you referring to? The K8s job completes successfully, no errors in the logs for the job pod. Thanks for the quick response!
k

Kevin Kho

08/05/2021, 3:56 PM
Prefect logs for the Flow
p

patrickd

08/05/2021, 4:23 PM
5 August 2021,11:19:50 	agent	INFO	Submitted for execution: Job prefect-job-35ef6afb
5 August 2021,11:20:12 	prefect.CloudFlowRunner	INFO	Beginning Flow run for 'dask-test'
5 August 2021,11:20:12 	prefect.DaskExecutor	INFO	Connecting to an existing Dask cluster at <tcp://dask-scheduler:8786>
5 August 2021,11:20:13 	prefect.CloudFlowRunner	INFO	Flow run RUNNING: terminal tasks are incomplete.
It hangs at the end here. Strangely, I can see the flow execute on the dask cluster in between "submitted for execution" and "beginning flow"
k

Kevin Kho

08/05/2021, 4:29 PM
I think that’s expected because the submitted is the end log and the beginning is the flow runner log. So I am thinking running the flow with DEBUG level logs might give us more info here? We’ve seen this in the past, just trying to get more info. Normally it was memory stuff or maybe dask hanging but it seems you checked that already. We’re working with the Coiled team to improve this. If you haven’t done it before, pass
PREFECT___LOGGING___LEVEL: "DEBUG"
to your RunConfig like `RunConfig(env={
PREFECT___LOGGING___LEVEL: "DEBUG"
}). Or you can do this through the UI can you click Flow. You can go to Advanced Config and add the env variable.
p

patrickd

08/05/2021, 4:39 PM
5 August 2021,11:32:05 	agent	INFO	Submitted for execution: Job prefect-job-4cad06a8
5 August 2021,11:32:25 	prefect.CloudFlowRunner	INFO	Beginning Flow run for 'dask-test'
5 August 2021,11:32:25 	prefect.CloudFlowRunner	DEBUG	Using executor type DaskExecutor
5 August 2021,11:32:25 	prefect.CloudFlowRunner	DEBUG	Flow 'dask-test': Handling state change from Scheduled to Running
5 August 2021,11:32:25 	prefect.DaskExecutor	INFO	Connecting to an existing Dask cluster at <tcp://dask-scheduler:8786>
5 August 2021,11:32:26 	prefect.CloudFlowRunner	DEBUG	Checking flow run state...
5 August 2021,11:32:26 	prefect.CloudFlowRunner	INFO	Flow run RUNNING: terminal tasks are incomplete.
Here are the logs at debug level. To double check, i took a look at the logs for the Dask workers and found this:
prefect.exceptions.AuthorizationError: Malformed response received from Cloud - please ensure that you are authenticated. See `prefect auth login --help`.
[2021-08-05 16:35:13+0000] INFO - prefect.CloudTaskRunner | Task 'inc': Finished task run for task with final state: 'Pending'
I'm using server backend - is Prefect Cloud required to run a persistent Dask cluster in kubernetes?
k

Kevin Kho

08/05/2021, 4:43 PM
I don’t think so. Will ask the team about this.
p

patrickd

08/05/2021, 4:49 PM
Ok, thanks for your help!
@Kevin Kho Figured it out - Config issue with the Dask helm cluster, worker pods were configured to cloud instead of server. Silly mistake on my part! If anyone comes across this issue, deploying this yaml file:
worker:
  env:
  - name: EXTRA_PIP_PACKAGES
    value: prefect --upgrade
  - name: PREFECT__BACKEND
    value: server
  - name: PREFECT__SERVER__HOST
    value: <http://prefect-apollo>
with your Dask helm deployment will fix it!
Thanks for your help
k

Kevin Kho

08/05/2021, 5:49 PM
Thanks for the update. How did you diagnose that? But the logs were showing in Server right? Kinda surprised you still got logs…or is that because the scheduler was right?
p

patrickd

08/05/2021, 6:22 PM
I shelled into a dask worker pod and ran
prefect diagnostics
which gave
{
  "config_overrides": {},
  "env_vars": [],
  "system_information": {
    "platform": "Linux-5.4.129-63.229.amzn2.x86_64-x86_64-with-glibc2.10",
    "prefect_backend": "cloud",
    "prefect_version": "0.15.3",
    "python_version": "3.8.0"
  }
}
so i added the PREFECT__BACKEND variable to my custom dask-helm.yaml. Once I did that I redeployed the helm cluster with
helm install dask -f kubernetes/dask-helm.yaml dask/dask
and tried rerunning the job. It failed again, but from the worker logs it was looking for the graphql API at localhost:4200. I added the host env var to dask-helm.yaml, redeployed, and it worked. Final prefect diagnostics gave
{
  "config_overrides": {},
  "env_vars": [
    "PREFECT__SERVER__HOST",
    "PREFECT__BACKEND"
  ],
  "system_information": {
    "platform": "Linux-5.4.129-63.229.amzn2.x86_64-x86_64-with-glibc2.10",
    "prefect_backend": "server",
    "prefect_version": "0.15.3",
    "python_version": "3.8.0"
  }
}
Pretty sure the reason I got logs was that the Prefect job pod was able to communicate with the Dask cluster as usual. when the jobs needed to be updated, the Dask workers attempted to contact Prefect Cloud instead of calling back to the server, so the job was indefinitely running
k

Kevin Kho

08/05/2021, 6:28 PM
Ah I see. That makes sense. Thanks for all the info!
👍 1
s

Sam Werbalowsky

08/10/2021, 2:35 AM
@patrickd what was the host env you added? I think I’m running into this same issue.
k

Kevin Kho

08/10/2021, 3:07 AM
He was referring to this right?
worker:
  env:
  - name: EXTRA_PIP_PACKAGES
    value: prefect --upgrade
  - name: PREFECT__BACKEND
    value: server
  - name: PREFECT__SERVER__HOST
    value: <http://prefect-apollo>
s

Sam Werbalowsky

08/10/2021, 3:13 AM
Yep, I added that and confirmed that the diagnostics changed, but still have issues...I can connect when running directly from python, but can't if running the flow from locally hosted sever. Alas, it is late and I will make a new thread tomorrow.
👍 1
p

patrickd

08/10/2021, 3:29 PM
@Sam Werbalowsky what does your environment look like? Are you running in k8s?
s

Sam Werbalowsky

08/10/2021, 3:30 PM
I was running using local agent and local prefect server UI using docker compose
Don’t have K8s deployment set up yet.
p

patrickd

08/10/2021, 3:31 PM
so local prefect server running in docker with a Dask cluster also running in docker?
s

Sam Werbalowsky

08/10/2021, 3:31 PM
I either end up with a timeout connecting to the dask cluster or it just sits in pending doing nothing. I will post more details later.
ah dask is deployed on K8s
I was able to verify backend was server by exec-ing in to the worker pod.
p

patrickd

08/10/2021, 3:32 PM
are you using kubectl port-forward on the dask cluster?
sounds like a networking issue to me
s

Sam Werbalowsky

08/10/2021, 3:32 PM
yes, I used port forward and attempted with both localhost (which doesn’t seem like it would work if being run on the k8s worker) or the amazon EC2 ELB. I will check with my devops guy if it’s networking. I’m not super knowledgeable about K8
although why would I be able to run it from the command line running
python myflow.py
but not from the prefect server UI?
p

patrickd

08/10/2021, 3:34 PM
it works from
python myflow.py
?
s

Sam Werbalowsky

08/10/2021, 3:34 PM
correct
but not if I run
prefect server start
, register the flow, then try to run it.
p

patrickd

08/10/2021, 3:37 PM
when you get a chance, dump me the logs from a dask worker pod after running the flow from the web ui and the output from
prefect diagnostics
for the pod and your local machine
👀 1
s

Sam Werbalowsky

08/10/2021, 7:07 PM
This is the
prefect diagnostics
from the worker -
{
  "config_overrides": {},
  "env_vars": [
    "PREFECT__SERVER__HOST",
    "PREFECT__BACKEND"
  ],
  "system_information": {
    "platform": "Linux-4.9.0-13-amd64-x86_64-with-glibc2.10",
    "prefect_backend": "server",
    "prefect_version": "0.15.3",
    "python_version": "3.8.3"
  }
}
and from my local machine
{
  "config_overrides": {},
  "env_vars": [],
  "system_information": {
    "platform": "macOS-10.14.6-x86_64-i386-64bit",
    "prefect_backend": "server",
    "prefect_version": "0.15.3",
    "python_version": "3.8.10"
  }
}
there aren’t any logs being generated in the worker except the basic info stuff
distributed.nanny - INFO -         Start Nanny at: '<tls://100.96.4.126:35683>'
distributed.worker - INFO -       Start worker at:   <tls://100.96.4.126:44803>
distributed.worker - INFO -          Listening to:   <tls://100.96.4.126:44803>
distributed.worker - INFO -          dashboard at:          100.96.4.126:8787
distributed.worker - INFO - Waiting to connect to: <tls://dask-783b0af74c35426e980a226efc51dd26.data-common:8786>
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -               Threads:                          1
distributed.worker - INFO -                Memory:                    2.15 GB
distributed.worker - INFO -       Local Directory: /home/dask/dask-worker-space/dask-worker-space/worker-jdtltre4
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -         Registered to: <tls://dask-783b0af74c35426e980a226efc51dd26.data-common:8786>
distributed.worker - INFO - -------------------------------------------------
distributed.core - INFO - Starting established connection
so yeah, I’d agree it looks like a networking thing
I am using the
dask-gateway-server
helm chart and using a custom image that installs prefect to the workers
prefect UI logs
10 August 2021,03:18:06 	agent	INFO	Submitted for execution: PID: 11033
10 August 2021,03:18:10 	prefect.CloudFlowRunner	INFO	Beginning Flow run for 'dask-example-local-dask'
10 August 2021,03:18:10 	prefect.CloudFlowRunner	DEBUG	Using executor type DaskExecutor
10 August 2021,03:18:10 	prefect.CloudFlowRunner	DEBUG	Flow 'dask-example-local-dask': Handling state change from Scheduled to Running
10 August 2021,03:18:10 	prefect.DaskExecutor	INFO	Connecting to an existing Dask cluster at gateway://<MY_GATEWAY>
10 August 2021,03:18:12 	prefect.CloudFlowRunner	DEBUG	Checking flow run state...
10 August 2021,03:18:12 	prefect.CloudFlowRunner	INFO	Flow run RUNNING: terminal tasks are incomplete.
ah ok - seeing tons of connection errors now in the pod that I made a new cluster and attempted to connect to that rather than make a new one - tons of stuff like this
Traceback (most recent call last):
  File "/opt/conda/lib/python3.8/site-packages/requests/adapters.py", line 439, in send
    resp = conn.urlopen(
  File "/opt/conda/lib/python3.8/site-packages/urllib3/connectionpool.py", line 719, in urlopen
    retries = retries.increment(
  File "/opt/conda/lib/python3.8/site-packages/urllib3/util/retry.py", line 436, in increment
    raise MaxRetryError(_pool, url, error or ResponseError(cause))
urllib3.exceptions.MaxRetryError: HTTPConnectionPool(host='prefect-apollo', port=4200): Max retries exceeded with url: /graphql (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7f66118de190>: Failed to establish a new connection: [Errno -2] Name or service not known'))

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/conda/lib/python3.8/site-packages/prefect/engine/cloud/task_runner.py", line 154, in initialize_run
    task_run_info = self.client.get_task_run_info(
  File "/opt/conda/lib/python3.8/site-packages/prefect/client/client.py", line 1721, in get_task_run_info
    result = self.graphql(mutation)  # type: Any
  File "/opt/conda/lib/python3.8/site-packages/prefect/client/client.py", line 543, in graphql
    result = <http://self.post|self.post>(
  File "/opt/conda/lib/python3.8/site-packages/prefect/client/client.py", line 446, in post
    response = self._request(
  File "/opt/conda/lib/python3.8/site-packages/prefect/client/client.py", line 710, in _request
    response = self._send_request(
  File "/opt/conda/lib/python3.8/site-packages/prefect/client/client.py", line 596, in _send_request
    response = <http://session.post|session.post>(
  File "/opt/conda/lib/python3.8/site-packages/requests/sessions.py", line 578, in post
    return self.request('POST', url, data=data, json=json, **kwargs)
  File "/opt/conda/lib/python3.8/site-packages/requests/sessions.py", line 530, in request
    resp = self.send(prep, **send_kwargs)
  File "/opt/conda/lib/python3.8/site-packages/requests/sessions.py", line 643, in send
    r = adapter.send(request, **kwargs)
  File "/opt/conda/lib/python3.8/site-packages/requests/adapters.py", line 516, in send
    raise ConnectionError(e, request=request)
requests.exceptions.ConnectionError: HTTPConnectionPool(host='prefect-apollo', port=4200): Max retries exceeded with url: /graphql (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7f66118de190>: Failed to establish a new connection: [Errno -2] Name or service not known'))
p

patrickd

08/10/2021, 8:01 PM
yeah, that's kinda what i thought the issue would be. the pod is trying to dial back to http://prefect-apollo:4200/graphql in the kube cluster - your dask cluster might not be able to hit your local machine from inside the kube cluster. You could try switching that to http://localhost in the yaml file, but not sure if that'll work
s

Sam Werbalowsky

08/10/2021, 8:15 PM
yeah I feel like it’s not going to be able to connect back. So it seems like it’s not really possible without some workarounds to use a prefect server on my laptop and then run a flow in a remote dask without additional configuration. But if I can run and test via python, that should be fine since there’s no real need to orchestrate locally. I do think it’s weird it works through command line, but not server, but that’s gonna be a problem for another day 😄. Appreciate you looking through those logs and advising.
n

Nivi Mukka

08/13/2021, 1:31 AM
Hi @patrickd, I am facing a similar issue. This thread helped a lot in narrowing down the issue. Do you think setting these environment variables in my
Dockerfile
would work?
PREFECT__BACKEND="server"
PREFECT__SERVER__HOST
PREFECT__SERVER__PORT
PREFECT__ENGINE__EXECUTOR__DEFAULT_CLASS="prefect.executors.DaskExecutor"
PREFECT__ENGINE__EXECUTOR__DASK__ADDRESS
p

patrickd

08/13/2021, 2:03 AM
@Nivi Mukka What does your architecture look like? Running both dask and prefect in k8s?
n

Nivi Mukka

08/13/2021, 2:03 AM
Yes, both Dask and Prefect are setup on a GKE cluster.
p

patrickd

08/13/2021, 2:06 AM
yep! just make sure that backend is set to server in your Dask worker pods and set the
PREFECT__SERVER__HOST
to
http://<prefect apollo service name>
and you should be set. are you deploying with helm?
👍 1
n

Nivi Mukka

08/13/2021, 2:13 AM
Okay, will give that a shot. No, not using helm. Just a Dockerfile with .py scripts for deployment.