Sylvain Hazard
10/27/2021, 10:15 AM[2021-10-27 10:10:56+0000] INFO - prefect.CloudFlowRunner | Beginning Flow run for 'Dask Kubernetes Flow'
[2021-10-27 10:10:56+0000] INFO - prefect.DaskExecutor | Creating a new Dask cluster with `<flow>.<lambda>`...
Creating scheduler pod on cluster. This may take some time.
[2021-10-27 10:10:59+0000] INFO - prefect.DaskExecutor | The Dask dashboard is available at <http://dask-root-ff14e4ba-c.default:8787/status>
/usr/local/lib/python3.8/site-packages/prefect/utilities/storage.py:242: UserWarning: This flow was built using Prefect '0.15.3', but you currently have Prefect '0.15.1' installed. We recommend loading flows with the same Prefect version they were built with, failure to do so may result in errors.
warnings.warn(
distributed.deploy.adaptive - INFO - Adaptive scaling started: minimum=2 maximum=3
distributed.deploy.adaptive_core - INFO - Adaptive stop
distributed.deploy.adaptive_core - INFO - Adaptive stop
[2021-10-27 10:11:04+0000] INFO - prefect.CloudFlowRunner | Flow run RUNNING: terminal tasks are incomplete.
Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x7f6139543c70>
Unclosed connector
connections: ['[(<aiohttp.client_proto.ResponseHandler object at 0x7f6138503580>, 1377825.083782873)]']
connector: <aiohttp.connector.TCPConnector object at 0x7f6139543a60>
Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x7f61395438e0>
Fatal error on SSL transport
protocol: <asyncio.sslproto.SSLProtocol object at 0x7f613952de80>
transport: <_SelectorSocketTransport closing fd=12>
Traceback (most recent call last):
File "/usr/local/lib/python3.8/asyncio/selector_events.py", line 910, in write
n = self._sock.send(data)
OSError: [Errno 9] Bad file descriptor
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.8/asyncio/sslproto.py", line 685, in _process_write_backlog
self._transport.write(chunk)
File "/usr/local/lib/python3.8/asyncio/selector_events.py", line 916, in write
self._fatal_error(exc, 'Fatal write error on socket transport')
File "/usr/local/lib/python3.8/asyncio/selector_events.py", line 711, in _fatal_error
self._force_close(exc)
File "/usr/local/lib/python3.8/asyncio/selector_events.py", line 723, in _force_close
self._loop.call_soon(self._call_connection_lost, exc)
File "/usr/local/lib/python3.8/asyncio/base_events.py", line 719, in call_soon
self._check_closed()
File "/usr/local/lib/python3.8/asyncio/base_events.py", line 508, in _check_closed
raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed
Unclosed connector
connections: ['[(<aiohttp.client_proto.ResponseHandler object at 0x7f613844ce80>, 1377825.017547919), (<aiohttp.client_proto.ResponseHandler object at 0x7f613952eca0>, 1377825.095556365)]']
connector: <aiohttp.connector.TCPConnector object at 0x7f6139543370>
distributed.deploy.adaptive_core - INFO - Adaptive stop
Anna Geller
10/27/2021, 10:23 AMSylvain Hazard
10/27/2021, 11:03 AMfrom dask_kubernetes import KubeCluster, make_pod_spec
from prefect.storage import Docker
from prefect import task, Flow
from prefect.executors import DaskExecutor
import prefect
from datetime import timedelta
from prefect.run_configs import KubernetesRun
# Configure a storage object, by default prefect's latest image will be used
storage = Docker(
base_image="prefecthq/prefect:0.15.1-python3.8",
python_dependencies=[
"dask-kubernetes==2021.3.1",
],
registry_url="<http://registry.hub.docker.com/syhazard/|registry.hub.docker.com/syhazard/>",
extra_dockerfile_commands=[
"RUN apt-get update && apt-get install -y curl &&\
curl -LO <https://dl.k8s.io/release/v1.21.2/bin/linux/amd64/kubectl> &&\
install -o root -g root -m 0755 kubectl /usr/local/bin/kubectl &&\
rm -rf /var/lib/apt/lists/*",
],
)
# Define some tasks for us to run in our flow
@task
def extract() -> list:
return [1, 2, 3, 4, 5, 6]
@task(max_retries=3, retry_delay=timedelta(seconds=15))
def transform(number: int)->int:
return number * 2
@task()
def load(numbers:list)->list:
return [i for i in numbers if i]
with Flow(
"Dask Kubernetes Flow",
storage=storage,
executor=DaskExecutor(
cluster_class=lambda: KubeCluster(make_pod_spec(image=prefect.context.image)),
adapt_kwargs={"minimum": 2, "maximum": 3},
),
run_config=KubernetesRun(),
) as flow:
numbers = extract()
numbers = extract()
tranformed_numbers = transform.map(numbers)
numbers_twice = transform.map(tranformed_numbers)
result = load(numbers=numbers_twice)
Anna Geller
10/27/2021, 12:40 PMdask-root-edf2bafc-ex6954 0/1 ContainerCreating 0 0s
dask-root-edf2bafc-ex6954 0/1 ErrImagePull 0 8s
The setup that worked for me was to:
1. Build the image yourself and push it to Dockerhub
2. Use Github or Cloud storage like S3 for Flow storage.
Here is a working example: https://github.com/anna-geller/packaging-prefect-flows/tree/master/ephemeral_dask_clusterSylvain Hazard
10/27/2021, 12:56 PMAnna Geller
10/27/2021, 1:22 PMSylvain Hazard
10/27/2021, 1:24 PMAnna Geller
10/27/2021, 1:26 PMSylvain Hazard
10/27/2021, 1:26 PMdistributed.scheduler - INFO - Clear task state
distributed.scheduler - INFO - Scheduler at: <tcp://10.240.0.130:8786>
distributed.scheduler - INFO - dashboard at: :8787
distributed.scheduler - INFO - Receive client connection: Client-857bf6dc-3728-11ec-8006-f690f8c30bdd
distributed.core - INFO - Starting established connection
Prefect flow pod does not log anything meaningful :
[2021-10-27 13:18:55+0000] INFO - prefect.DaskExecutor | Creating a new Dask cluster with `src.config.MetaConfig.k8s_executor.<locals>.<lambda>`...
Creating scheduler pod on cluster. This may take some time.
[2021-10-27 13:19:31+0000] INFO - prefect.DaskExecutor | The Dask dashboard is available at <http://dask-root-6aa88517-5.default:8787/status>
I cannot see any other dask pod being started and the flow keeps running and not doing anythingAnna Geller
10/27/2021, 1:28 PMSylvain Hazard
10/27/2021, 1:28 PMrun_config = KubernetesRun(
image=f"{self.flow.image}",
job_template_path=f"{JOB_TEMPLATE_PATH}",
image_pull_policy="Always",
)
The KubeCluster
does not have any of these secrets which might be an issue ?Anna Geller
10/27/2021, 1:35 PMmake_pod_spec
? Can you share a full example I could reproduce? This may be contradictory. My understanding is that KubeCluster takes pod template created by make_pod_spec, and your job_template_path is only applied for the Flow pod, but not for the Dask podsSylvain Hazard
10/27/2021, 3:17 PMKubeCluster
and the KubernetesRun
instead of only specifying an image.run_config = KubernetesRun(job_template_path=JOB_TEMPLATE_PATH)
executor = DaskExecutor(cluster_class=lambda: KubeCluster(JOB_TEMPLATE_PATH))
Anna Geller
10/27/2021, 3:39 PMSylvain Hazard
10/28/2021, 6:51 AMRunning
and nothing happens. It feels hard to know what's wrong in this context 😕
2 - Everything is running on a k8s cluster running on Azure.
3/4 - I'll check this out with my DevOps team. As I'm new to the company I might have overlooked some stuff on that end.
5 - Checked all Prefect related pod and did not find anything that looked wrong. I believe the issue is on the Dask side since it looks like it is the Dask Root pod that keeps being stuckERROR - prefect.CloudTaskRunner | Failed to retrieve task state with error: AuthorizationError([{'path': ['get_or_create_task_run_info'], 'message': 'Unauthenticated', 'extensions': {'code': 'UNAUTHENTICATED'}}])
Traceback (most recent call last):
File "/usr/local/lib/python3.8/site-packages/prefect/engine/cloud/task_runner.py", line 154, in initialize_run
task_run_info = self.client.get_task_run_info(
File "/usr/local/lib/python3.8/site-packages/prefect/client/client.py", line 1721, in get_task_run_info
result = self.graphql(mutation) # type: Any
File "/usr/local/lib/python3.8/site-packages/prefect/client/client.py", line 556, in graphql
raise AuthorizationError(result["errors"])
prefect.exceptions.AuthorizationError: [{'path': ['get_or_create_task_run_info'], 'message': 'Unauthenticated', 'extensions': {'code': 'UNAUTHENTICATED'}}]
[2021-10-28 08:02:48+0000] INFO - prefect.CloudTaskRunner | Task 'SlackTask': Finished task run for task with final state: 'Pending'
I get a bunch of error messages like this one.
Any idea where that might be coming from ?PREFECT__SERVER____HOST
and PREFECT__SERVER__PORT
as well as a prefect backend server
to my Dockerfile and the issue has disappeared. Getting closer to the goal ahah.
Again, thanks for your help on this issue and others, it's very comforting knowing it is that easy to get help when I need it.Anna Geller
10/28/2021, 9:06 AM