Hi ! I'm trying to run flows using temporary Dask ...
# prefect-server
s
Hi ! I'm trying to run flows using temporary Dask clusters as described here. Using the exact same code as in the article, it does not seem to work as intended. When launching the flow, a job pod is properly created, followed by a dask root pod. It lasts for a few seconds then the dask pod disappears and the job pod terminates. Job pod logs are below. Got any idea what might be going wrong ?
Copy code
[2021-10-27 10:10:56+0000] INFO - prefect.CloudFlowRunner | Beginning Flow run for 'Dask Kubernetes Flow'
[2021-10-27 10:10:56+0000] INFO - prefect.DaskExecutor | Creating a new Dask cluster with `<flow>.<lambda>`...
Creating scheduler pod on cluster. This may take some time.
[2021-10-27 10:10:59+0000] INFO - prefect.DaskExecutor | The Dask dashboard is available at <http://dask-root-ff14e4ba-c.default:8787/status>
/usr/local/lib/python3.8/site-packages/prefect/utilities/storage.py:242: UserWarning: This flow was built using Prefect '0.15.3', but you currently have Prefect '0.15.1' installed. We recommend loading flows with the same Prefect version they were built with, failure to do so may result in errors.
  warnings.warn(
distributed.deploy.adaptive - INFO - Adaptive scaling started: minimum=2 maximum=3
distributed.deploy.adaptive_core - INFO - Adaptive stop
distributed.deploy.adaptive_core - INFO - Adaptive stop
[2021-10-27 10:11:04+0000] INFO - prefect.CloudFlowRunner | Flow run RUNNING: terminal tasks are incomplete.
Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x7f6139543c70>
Unclosed connector
connections: ['[(<aiohttp.client_proto.ResponseHandler object at 0x7f6138503580>, 1377825.083782873)]']
connector: <aiohttp.connector.TCPConnector object at 0x7f6139543a60>
Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x7f61395438e0>
Fatal error on SSL transport
protocol: <asyncio.sslproto.SSLProtocol object at 0x7f613952de80>
transport: <_SelectorSocketTransport closing fd=12>
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/asyncio/selector_events.py", line 910, in write
    n = self._sock.send(data)
OSError: [Errno 9] Bad file descriptor

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.8/asyncio/sslproto.py", line 685, in _process_write_backlog
    self._transport.write(chunk)
  File "/usr/local/lib/python3.8/asyncio/selector_events.py", line 916, in write
    self._fatal_error(exc, 'Fatal write error on socket transport')
  File "/usr/local/lib/python3.8/asyncio/selector_events.py", line 711, in _fatal_error
    self._force_close(exc)
  File "/usr/local/lib/python3.8/asyncio/selector_events.py", line 723, in _force_close
    self._loop.call_soon(self._call_connection_lost, exc)
  File "/usr/local/lib/python3.8/asyncio/base_events.py", line 719, in call_soon
    self._check_closed()
  File "/usr/local/lib/python3.8/asyncio/base_events.py", line 508, in _check_closed
    raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed
Unclosed connector
connections: ['[(<aiohttp.client_proto.ResponseHandler object at 0x7f613844ce80>, 1377825.017547919), (<aiohttp.client_proto.ResponseHandler object at 0x7f613952eca0>, 1377825.095556365)]']
connector: <aiohttp.connector.TCPConnector object at 0x7f6139543370>
distributed.deploy.adaptive_core - INFO - Adaptive stop
a
@Sylvain Hazard can you share a minimal example or your entire flow? I think your setup looks good, but it seems you have some unclosed database connections, file descriptors or HTTP sessions
s
There it is :
Copy code
from dask_kubernetes import KubeCluster, make_pod_spec
from prefect.storage import Docker
from prefect import task, Flow
from prefect.executors import DaskExecutor
import prefect
from datetime import timedelta
from prefect.run_configs import KubernetesRun

# Configure a storage object, by default prefect's latest image will be used
storage = Docker(
    base_image="prefecthq/prefect:0.15.1-python3.8",
    python_dependencies=[
        "dask-kubernetes==2021.3.1",
    ],
    registry_url="<http://registry.hub.docker.com/syhazard/|registry.hub.docker.com/syhazard/>",
    extra_dockerfile_commands=[
        "RUN apt-get update && apt-get install -y curl &&\
        curl -LO <https://dl.k8s.io/release/v1.21.2/bin/linux/amd64/kubectl> &&\
        install -o root -g root -m 0755 kubectl /usr/local/bin/kubectl &&\
        rm -rf /var/lib/apt/lists/*",
    ],
)

# Define some tasks for us to run in our flow
@task
def extract() -> list:
    return [1, 2, 3, 4, 5, 6]
  

@task(max_retries=3, retry_delay=timedelta(seconds=15))
def transform(number: int)->int:
    return number * 2


@task()
def load(numbers:list)->list:
    return [i for i in numbers if i]

with Flow(
    "Dask Kubernetes Flow",
    storage=storage,
    executor=DaskExecutor(
        cluster_class=lambda: KubeCluster(make_pod_spec(image=prefect.context.image)),
        adapt_kwargs={"minimum": 2, "maximum": 3},
    ),
    run_config=KubernetesRun(),
) as flow:
    numbers = extract()
    
    numbers = extract()
    tranformed_numbers = transform.map(numbers)
    numbers_twice = transform.map(tranformed_numbers)
    result = load(numbers=numbers_twice)
Basically the same as in the article. My agent configuration RBAC is not exactly the same but it should be more permissive than what is proposed in the medium article so it shouldn't be an issue.
a
@Sylvain Hazard I was trying to reproduce the issue. It took me some time but eventually I got it working with some adjustments. I believe the most difficult issue in this setup (at least for me) was configuring the Docker image, its repository and permissions on all components: K8s cluster, Dask scheduler, and the place from which you register flows. And you also use Server rather than Cloud which complicates things even more. Using Docker storage, you would either have to: 1. Use a public image and thus expose your flow to the world - which you likely don’t want to do, 2. Configure Docker registry secrets on Server, K8s cluster and Dask. The latter caused me some issues and I ended up with many errors on the Dask side to pull the image:
Copy code
dask-root-edf2bafc-ex6954        0/1     ContainerCreating   0          0s
dask-root-edf2bafc-ex6954        0/1     ErrImagePull        0          8s
The setup that worked for me was to: 1. Build the image yourself and push it to Dockerhub 2. Use Github or Cloud storage like S3 for Flow storage. Here is a working example: https://github.com/anna-geller/packaging-prefect-flows/tree/master/ephemeral_dask_cluster
s
Thanks a lot for the work you put into this ! It makes perfect sense. To be perfectly honest, our actual use case involves a Git storage pointing to our Gitlab. We use a custom docker image for both flow registering (in Gitlab CI) and running flows. Ideally Dask would use this image. It means that the image should have the necessary tokens to access Gitlab in every one of these cases, right ?
a
@Sylvain Hazard the flow code is retrieved by the agent and the agent then offloads the actual processing to the executor (here: DaskExecutor) so that Dask doesn’t need access to Gitlab any more. I believe you could set this Gitlab token in your agent yaml file (similar to dask.yml in the repo above) as environment variable
s
Doesn't the dask scheduler pod need access to Gitlab in order to create the worker pods with the correct image ?
a
I don’t think so, because in our use case, we are setting the image on the run configuration and it’s the retrieved by the DaskExecutor from Prefect context
s
I'm running into a configuration where the Dask root pod seems to be stuck.
Copy code
distributed.scheduler - INFO - Clear task state
distributed.scheduler - INFO -   Scheduler at:   <tcp://10.240.0.130:8786>
distributed.scheduler - INFO -   dashboard at:                     :8787
distributed.scheduler - INFO - Receive client connection: Client-857bf6dc-3728-11ec-8006-f690f8c30bdd
distributed.core - INFO - Starting established connection
Prefect flow pod does not log anything meaningful :
Copy code
[2021-10-27 13:18:55+0000] INFO - prefect.DaskExecutor | Creating a new Dask cluster with `src.config.MetaConfig.k8s_executor.<locals>.<lambda>`...
Creating scheduler pod on cluster. This may take some time.
[2021-10-27 13:19:31+0000] INFO - prefect.DaskExecutor | The Dask dashboard is available at <http://dask-root-6aa88517-5.default:8787/status>
I cannot see any other dask pod being started and the flow keeps running and not doing anything
a
it’s stuck with which pod status? you can see it using: kubectl get pods
s
Both are running
Oh, maybe the issue comes from the fact that I'm setting some prefect secrets (including the gitlab token) in my job yaml template and setting the Kubernetes run like this :
Copy code
run_config = KubernetesRun(
            image=f"{self.flow.image}",
            job_template_path=f"{JOB_TEMPLATE_PATH}",
            image_pull_policy="Always",
        )
The
KubeCluster
does not have any of these secrets which might be an issue ?
a
Do you still use
make_pod_spec
? Can you share a full example I could reproduce? This may be contradictory. My understanding is that KubeCluster takes pod template created by make_pod_spec, and your job_template_path is only applied for the Flow pod, but not for the Dask pods
s
I can't seem to be able to build an example that you would be able to use on your side unfortunately. :( Basically, we are trying to reproduce what you did in your repo but using a yaml template for both the
KubeCluster
and the
KubernetesRun
instead of only specifying an image.
At this point, I have something like this :
Copy code
run_config = KubernetesRun(job_template_path=JOB_TEMPLATE_PATH)

executor = DaskExecutor(cluster_class=lambda: KubeCluster(JOB_TEMPLATE_PATH))
Storage is still a Git storage pointing to the same image the k8s template does.
a
@Sylvain Hazard Since I can’t replicate the issue, I will just throw a couple of ideas and things you may try: 1. Are you still getting the same error as before or is it something else now? 2. Where are you running your Server, Dask and Kubernetes agent - is it everything on-prem or some cloud provider? 3. Permissions and security: perhaps some IAM roles or Kubernetes secrets are missing? Is your Docker image in a private repository? If so, then permissions to pull the image needs to be set up either via IAM roles or Kubernetes Secrets 4. Networking: did you configure networking in some way? Perhaps some security groups/firewall rules are missing? 5. Did you check logs on all pods and components to see if anything is suspicious?
s
Thanks for the input ! 1 - At this point, what kind of bothers me is that there is not error message. Both the dask root pod and the prefect runner pod are
Running
and nothing happens. It feels hard to know what's wrong in this context 😕 2 - Everything is running on a k8s cluster running on Azure. 3/4 - I'll check this out with my DevOps team. As I'm new to the company I might have overlooked some stuff on that end. 5 - Checked all Prefect related pod and did not find anything that looked wrong. I believe the issue is on the Dask side since it looks like it is the Dask Root pod that keeps being stuck
Alright, I'm finally getting to a point where Dask starts some worker pods ! However, it seems like there is an issue with authentication :
Copy code
ERROR - prefect.CloudTaskRunner | Failed to retrieve task state with error: AuthorizationError([{'path': ['get_or_create_task_run_info'], 'message': 'Unauthenticated', 'extensions': {'code': 'UNAUTHENTICATED'}}])
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/prefect/engine/cloud/task_runner.py", line 154, in initialize_run
    task_run_info = self.client.get_task_run_info(
  File "/usr/local/lib/python3.8/site-packages/prefect/client/client.py", line 1721, in get_task_run_info
    result = self.graphql(mutation)  # type: Any
  File "/usr/local/lib/python3.8/site-packages/prefect/client/client.py", line 556, in graphql
    raise AuthorizationError(result["errors"])
prefect.exceptions.AuthorizationError: [{'path': ['get_or_create_task_run_info'], 'message': 'Unauthenticated', 'extensions': {'code': 'UNAUTHENTICATED'}}]
[2021-10-28 08:02:48+0000] INFO - prefect.CloudTaskRunner | Task 'SlackTask': Finished task run for task with final state: 'Pending'
I get a bunch of error messages like this one. Any idea where that might be coming from ?
We added an oauth2 authentication on top of the Server image in order to prevent external access to it but the image used for dask workers is able to register flows in our CI so I believe it should be able to communicate with the server otherwise ?
Added both
PREFECT__SERVER____HOST
and
PREFECT__SERVER__PORT
as well as a
prefect backend server
to my Dockerfile and the issue has disappeared. Getting closer to the goal ahah. Again, thanks for your help on this issue and others, it's very comforting knowing it is that easy to get help when I need it.
a
So it was the Server’s Dockerfile all along… Interesting. Thanks for your update, and glad you figured it all out!