patrickd
08/05/2021, 2:57 PMfrom prefect import task, Flow
import random
from time import sleep
from prefect.run_configs import KubernetesRun
from prefect.executors import DaskExecutor
from prefect.storage import GitLab
@task
def inc(x):
sleep(random.random() / 10)
return x + 1
@task
def dec(x):
sleep(random.random() / 10)
return x - 1
@task
def add(x, y):
sleep(random.random() / 10)
return x + y
@task(name="sum")
def list_sum(arr):
return sum(arr)
with Flow("dask-test") as flow:
incs = inc.map(x=range(100))
decs = dec.map(x=range(100))
adds = add.map(x=incs, y=decs)
total = list_sum(adds)
flow.storage = GitLab(<git kwargs>)
flow.run_config = KubernetesRun(image="prefecthq/prefect:latest-python3.8")
flow.executor = DaskExecutor("<tcp://dask-scheduler:8786>")
flow.register(project_name="test")
When running this flow with a LocalDaskExecutor
, it works fine. In the Kubernetes cluster, however, the job is never marked as finished on the Prefect server. I can see the job execute to completion on the Dask scheduler UI, but the flow run goes indefinitely on Prefect. Any help would be greatly appreciated!Kevin Kho
08/05/2021, 3:22 PMpatrickd
08/05/2021, 3:48 PMKevin Kho
08/05/2021, 3:56 PMpatrickd
08/05/2021, 4:23 PM5 August 2021,11:19:50 agent INFO Submitted for execution: Job prefect-job-35ef6afb
5 August 2021,11:20:12 prefect.CloudFlowRunner INFO Beginning Flow run for 'dask-test'
5 August 2021,11:20:12 prefect.DaskExecutor INFO Connecting to an existing Dask cluster at <tcp://dask-scheduler:8786>
5 August 2021,11:20:13 prefect.CloudFlowRunner INFO Flow run RUNNING: terminal tasks are incomplete.
It hangs at the end here. Strangely, I can see the flow execute on the dask cluster in between "submitted for execution" and "beginning flow"Kevin Kho
08/05/2021, 4:29 PMPREFECT___LOGGING___LEVEL: "DEBUG"
to your RunConfig like `RunConfig(env={PREFECT___LOGGING___LEVEL: "DEBUG"
}). Or you can do this through the UI can you click Flow. You can go to Advanced Config and add the env variable.patrickd
08/05/2021, 4:39 PM5 August 2021,11:32:05 agent INFO Submitted for execution: Job prefect-job-4cad06a8
5 August 2021,11:32:25 prefect.CloudFlowRunner INFO Beginning Flow run for 'dask-test'
5 August 2021,11:32:25 prefect.CloudFlowRunner DEBUG Using executor type DaskExecutor
5 August 2021,11:32:25 prefect.CloudFlowRunner DEBUG Flow 'dask-test': Handling state change from Scheduled to Running
5 August 2021,11:32:25 prefect.DaskExecutor INFO Connecting to an existing Dask cluster at <tcp://dask-scheduler:8786>
5 August 2021,11:32:26 prefect.CloudFlowRunner DEBUG Checking flow run state...
5 August 2021,11:32:26 prefect.CloudFlowRunner INFO Flow run RUNNING: terminal tasks are incomplete.
Here are the logs at debug level. To double check, i took a look at the logs for the Dask workers and found this:
prefect.exceptions.AuthorizationError: Malformed response received from Cloud - please ensure that you are authenticated. See `prefect auth login --help`.
[2021-08-05 16:35:13+0000] INFO - prefect.CloudTaskRunner | Task 'inc': Finished task run for task with final state: 'Pending'
I'm using server backend - is Prefect Cloud required to run a persistent Dask cluster in kubernetes?Kevin Kho
08/05/2021, 4:43 PMpatrickd
08/05/2021, 4:49 PMworker:
env:
- name: EXTRA_PIP_PACKAGES
value: prefect --upgrade
- name: PREFECT__BACKEND
value: server
- name: PREFECT__SERVER__HOST
value: <http://prefect-apollo>
with your Dask helm deployment will fix it!Kevin Kho
08/05/2021, 5:49 PMpatrickd
08/05/2021, 6:22 PMprefect diagnostics
which gave
{
"config_overrides": {},
"env_vars": [],
"system_information": {
"platform": "Linux-5.4.129-63.229.amzn2.x86_64-x86_64-with-glibc2.10",
"prefect_backend": "cloud",
"prefect_version": "0.15.3",
"python_version": "3.8.0"
}
}
so i added the PREFECT__BACKEND variable to my custom dask-helm.yaml. Once I did that I redeployed the helm cluster with
helm install dask -f kubernetes/dask-helm.yaml dask/dask
and tried rerunning the job. It failed again, but from the worker logs it was looking for the graphql API at localhost:4200. I added the host env var to dask-helm.yaml, redeployed, and it worked. Final prefect diagnostics gave
{
"config_overrides": {},
"env_vars": [
"PREFECT__SERVER__HOST",
"PREFECT__BACKEND"
],
"system_information": {
"platform": "Linux-5.4.129-63.229.amzn2.x86_64-x86_64-with-glibc2.10",
"prefect_backend": "server",
"prefect_version": "0.15.3",
"python_version": "3.8.0"
}
}
Kevin Kho
08/05/2021, 6:28 PMSam Werbalowsky
08/10/2021, 2:35 AMKevin Kho
08/10/2021, 3:07 AMworker:
env:
- name: EXTRA_PIP_PACKAGES
value: prefect --upgrade
- name: PREFECT__BACKEND
value: server
- name: PREFECT__SERVER__HOST
value: <http://prefect-apollo>
Sam Werbalowsky
08/10/2021, 3:13 AMpatrickd
08/10/2021, 3:29 PMSam Werbalowsky
08/10/2021, 3:30 PMpatrickd
08/10/2021, 3:31 PMSam Werbalowsky
08/10/2021, 3:31 PMpatrickd
08/10/2021, 3:32 PMSam Werbalowsky
08/10/2021, 3:32 PMpython myflow.py
but not from the prefect server UI?patrickd
08/10/2021, 3:34 PMpython myflow.py
?Sam Werbalowsky
08/10/2021, 3:34 PMprefect server start
, register the flow, then try to run it.patrickd
08/10/2021, 3:37 PMprefect diagnostics
for the pod and your local machineSam Werbalowsky
08/10/2021, 7:07 PMprefect diagnostics
from the worker -
{
"config_overrides": {},
"env_vars": [
"PREFECT__SERVER__HOST",
"PREFECT__BACKEND"
],
"system_information": {
"platform": "Linux-4.9.0-13-amd64-x86_64-with-glibc2.10",
"prefect_backend": "server",
"prefect_version": "0.15.3",
"python_version": "3.8.3"
}
}
and from my local machine
{
"config_overrides": {},
"env_vars": [],
"system_information": {
"platform": "macOS-10.14.6-x86_64-i386-64bit",
"prefect_backend": "server",
"prefect_version": "0.15.3",
"python_version": "3.8.10"
}
}
there aren’t any logs being generated in the worker except the basic info stuff
distributed.nanny - INFO - Start Nanny at: '<tls://100.96.4.126:35683>'
distributed.worker - INFO - Start worker at: <tls://100.96.4.126:44803>
distributed.worker - INFO - Listening to: <tls://100.96.4.126:44803>
distributed.worker - INFO - dashboard at: 100.96.4.126:8787
distributed.worker - INFO - Waiting to connect to: <tls://dask-783b0af74c35426e980a226efc51dd26.data-common:8786>
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO - Threads: 1
distributed.worker - INFO - Memory: 2.15 GB
distributed.worker - INFO - Local Directory: /home/dask/dask-worker-space/dask-worker-space/worker-jdtltre4
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO - Registered to: <tls://dask-783b0af74c35426e980a226efc51dd26.data-common:8786>
distributed.worker - INFO - -------------------------------------------------
distributed.core - INFO - Starting established connection
so yeah, I’d agree it looks like a networking thingdask-gateway-server
helm chart and using a custom image that installs prefect to the workers10 August 2021,03:18:06 agent INFO Submitted for execution: PID: 11033
10 August 2021,03:18:10 prefect.CloudFlowRunner INFO Beginning Flow run for 'dask-example-local-dask'
10 August 2021,03:18:10 prefect.CloudFlowRunner DEBUG Using executor type DaskExecutor
10 August 2021,03:18:10 prefect.CloudFlowRunner DEBUG Flow 'dask-example-local-dask': Handling state change from Scheduled to Running
10 August 2021,03:18:10 prefect.DaskExecutor INFO Connecting to an existing Dask cluster at gateway://<MY_GATEWAY>
10 August 2021,03:18:12 prefect.CloudFlowRunner DEBUG Checking flow run state...
10 August 2021,03:18:12 prefect.CloudFlowRunner INFO Flow run RUNNING: terminal tasks are incomplete.
Traceback (most recent call last):
File "/opt/conda/lib/python3.8/site-packages/requests/adapters.py", line 439, in send
resp = conn.urlopen(
File "/opt/conda/lib/python3.8/site-packages/urllib3/connectionpool.py", line 719, in urlopen
retries = retries.increment(
File "/opt/conda/lib/python3.8/site-packages/urllib3/util/retry.py", line 436, in increment
raise MaxRetryError(_pool, url, error or ResponseError(cause))
urllib3.exceptions.MaxRetryError: HTTPConnectionPool(host='prefect-apollo', port=4200): Max retries exceeded with url: /graphql (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7f66118de190>: Failed to establish a new connection: [Errno -2] Name or service not known'))
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/opt/conda/lib/python3.8/site-packages/prefect/engine/cloud/task_runner.py", line 154, in initialize_run
task_run_info = self.client.get_task_run_info(
File "/opt/conda/lib/python3.8/site-packages/prefect/client/client.py", line 1721, in get_task_run_info
result = self.graphql(mutation) # type: Any
File "/opt/conda/lib/python3.8/site-packages/prefect/client/client.py", line 543, in graphql
result = <http://self.post|self.post>(
File "/opt/conda/lib/python3.8/site-packages/prefect/client/client.py", line 446, in post
response = self._request(
File "/opt/conda/lib/python3.8/site-packages/prefect/client/client.py", line 710, in _request
response = self._send_request(
File "/opt/conda/lib/python3.8/site-packages/prefect/client/client.py", line 596, in _send_request
response = <http://session.post|session.post>(
File "/opt/conda/lib/python3.8/site-packages/requests/sessions.py", line 578, in post
return self.request('POST', url, data=data, json=json, **kwargs)
File "/opt/conda/lib/python3.8/site-packages/requests/sessions.py", line 530, in request
resp = self.send(prep, **send_kwargs)
File "/opt/conda/lib/python3.8/site-packages/requests/sessions.py", line 643, in send
r = adapter.send(request, **kwargs)
File "/opt/conda/lib/python3.8/site-packages/requests/adapters.py", line 516, in send
raise ConnectionError(e, request=request)
requests.exceptions.ConnectionError: HTTPConnectionPool(host='prefect-apollo', port=4200): Max retries exceeded with url: /graphql (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7f66118de190>: Failed to establish a new connection: [Errno -2] Name or service not known'))
patrickd
08/10/2021, 8:01 PMSam Werbalowsky
08/10/2021, 8:15 PMNivi Mukka
08/13/2021, 1:31 AMDockerfile
would work?
PREFECT__BACKEND="server"
PREFECT__SERVER__HOST
PREFECT__SERVER__PORT
PREFECT__ENGINE__EXECUTOR__DEFAULT_CLASS="prefect.executors.DaskExecutor"
PREFECT__ENGINE__EXECUTOR__DASK__ADDRESS
patrickd
08/13/2021, 2:03 AMNivi Mukka
08/13/2021, 2:03 AMpatrickd
08/13/2021, 2:06 AMPREFECT__SERVER__HOST
to
http://<prefect apollo service name>
and you should be set. are you deploying with helm?Nivi Mukka
08/13/2021, 2:13 AM