Heyho :octopus: We tested out the Dask provider an...
# prefect-community
t
Heyho 🐙 We tested out the Dask provider and think that Prefect version 2.0b6 maybe creates a Dask cluster that has a version mismatch between client and server when using the "aws.FargateCluster" cloudprovider. Steps to reproduce can be found in the thread 🙂
1
a
Can you please move the code blocks and details into the thread and only include the main problem in your main message? Thanks so much!
t
Steps to reproduce: Use the Prefect 2.0 "getting started" for Dask code and adapt the DaskTaskRunner to use Fargate (documentation). Full code below:
Copy code
from prefect import task, flow
from prefect.task_runners import DaskTaskRunner

fargate_dask_runner = DaskTaskRunner(
    cluster_class="dask_cloudprovider.aws.FargateCluster",
    adapt_kwargs={
        "maximum": 100,
    }
)


@task
def say_hello(name):
    print(f"hello {name}")


@task
def say_goodbye(name):
    print(f"goodbye {name}")


@flow(task_runner=fargate_dask_runner)
def greetings(names):
    for name in names:
        say_hello(name)
        say_goodbye(name)


if __name__ == "__main__":
    greetings(["arthur", "trillian", "ford", "marvin"])
Then install the dependencies:
pip install prefect==2.0b6 "dask-cloudprovider[aws]" dask
When running the script I run into the following error:
Copy code
11:58:52.731 | INFO    | prefect.engine - Created flow run 'knowing-zebu' for flow 'greetings'
11:58:52.732 | INFO    | Flow run 'knowing-zebu' - Using task runner 'DaskTaskRunner'
11:58:52.734 | INFO    | prefect.task_runner.dask - Creating a new Dask cluster with `dask_cloudprovider.aws.ecs.FargateCluster`
/home/tobias/anaconda3/envs/test-dask/lib/python3.7/contextlib.py:119: UserWarning: Creating your cluster is taking a surprisingly long time. This is likely due to pending resources on AWS. Hang tight! 
  next(self.gen)
distributed.deploy.adaptive - INFO - Adaptive scaling started: minimum=0 maximum=100
/home/tobias/anaconda3/envs/test-dask/lib/python3.7/site-packages/distributed/client.py:1265: VersionMismatchWarning: Mismatched versions found

+-------------+-----------+-----------+---------+
| Package     | client    | scheduler | workers |
+-------------+-----------+-----------+---------+
| dask        | 2022.02.0 | 2022.6.0  | None    |
| distributed | 2022.02.0 | 2022.6.0  | None    |
| lz4         | 4.0.1     | 4.0.0     | None    |
+-------------+-----------+-----------+---------+
  warnings.warn(version_module.VersionMismatchWarning(msg[0]["warning"]))
12:00:23.378 | INFO    | prefect.task_runner.dask - The Dask dashboard is available at <http://34.240.46.11:8787/status>
12:00:24.381 | INFO    | Flow run 'knowing-zebu' - Created task run 'say_hello-d71d0552-0' for task 'say_hello'
12:00:24.845 | INFO    | Flow run 'knowing-zebu' - Created task run 'say_goodbye-18d6ba96-0' for task 'say_goodbye'
12:00:25.192 | INFO    | Flow run 'knowing-zebu' - Created task run 'say_hello-d71d0552-1' for task 'say_hello'
12:00:25.470 | INFO    | Flow run 'knowing-zebu' - Created task run 'say_goodbye-18d6ba96-1' for task 'say_goodbye'
12:00:25.723 | INFO    | Flow run 'knowing-zebu' - Created task run 'say_hello-d71d0552-2' for task 'say_hello'
12:00:25.969 | INFO    | Flow run 'knowing-zebu' - Created task run 'say_goodbye-18d6ba96-2' for task 'say_goodbye'
12:00:26.229 | INFO    | Flow run 'knowing-zebu' - Created task run 'say_hello-d71d0552-3' for task 'say_hello'
12:00:26.610 | INFO    | Flow run 'knowing-zebu' - Created task run 'say_goodbye-18d6ba96-3' for task 'say_goodbye'
pip instal12:01:02.654 | INFO    | Task run 'say_hello-d71d0552-0' - Crash detected! Execution was interrupted by an unexpected exception.
l 12:01:03.030 | INFO    | Task run 'say_goodbye-18d6ba96-0' - Crash detected! Execution was interrupted by an unexpected exception.
12:01:03.395 | INFO    | Task run 'say_hello-d71d0552-1' - Crash detected! Execution was interrupted by an unexpected exception.
12:01:03.686 | INFO    | Task run 'say_goodbye-18d6ba96-1' - Crash detected! Execution was interrupted by an unexpected exception.
12:01:03.907 | INFO    | Task run 'say_hello-d71d0552-2' - Crash detected! Execution was interrupted by an unexpected exception.
12:01:04.138 | INFO    | Task run 'say_goodbye-18d6ba96-2' - Crash detected! Execution was interrupted by an unexpected exception.
12:01:04.393 | INFO    | Task run 'say_hello-d71d0552-3' - Crash detected! Execution was interrupted by an unexpected exception.
12:01:04.630 | INFO    | Task run 'say_goodbye-18d6ba96-3' - Crash detected! Execution was interrupted by an unexpected exception.
^Cdistributed.deploy.adaptive_core - INFO - Adaptive stop
12:01:06.126 | ERROR   | Flow run 'knowing-zebu' - Crash detected! Execution was cancelled by the runtime environment.
12:01:06.563 | ERROR   | asyncio - unhandled exception during asyncio.run() shutdown
[...]
RuntimeError: IOLoop is closed
It seems like the client/scheduler version is mismatched. I tried to match the v2022.6.0 that is printed for the scheduler on the client by
pip install dask==2022.6.0
but the weird part is, there is no version 2022.6.0 available. Has anyone got an idea of how to fix this? Or another debugging approach to circle in on the root cause? I'm not quite sure whether this is a bug in Prefect, a bug in Dask (/Dask-Cloudprovider), or if I'm missing something important and the issue is with our implementation. Thanks a lot! 🙂
🙏 2
a
Fantastic write-up, thank you so much! The version mismatch is not too cool, but in the end, this is "just" a warning. The actual error you are getting is due to missing permissions - check this issue for more background on this error - even though it was for Prefect 1.0, the root cause of the error seems to be the same: insufficient IAM permissions Can you try temporarily attaching admin rights to your task role?
If permissions are not helping here, LMK - we can open an issue to investigate in more depth. However, at this time, I'm not sure whether the issue would be about permissioning of IAM roles (RuntimeError: IOLoop is closed) or about the version mismatch, or both
t
That totally sounds plausible - thanks a lot! The permissions should not be an issue as far as I know. I'm not able to go with admin credentials but I have "nearly admin" credentials (cannot change organization and billing). Should be plenty, right? The image issue sounds interesting. By default, it starts with "daskdev/dask:latest". After some tests, I'm 99% sure that the default image is not able to deserialize the tasks and therefore crashes. Is there an official image I can use for Prefect 2.0 or is there any documentation on how to build it?
quick update: I tried
prefecthq/prefect:2.0b6
and
prefecthq/prefect:latest
. Did not get it to work unfortunately
a
The permissions should not be an issue as far as I know. I'm not able to go with admin credentials but I have "nearly admin" credentials (cannot change organization and billing). Should be plenty, right?
When faced with exactly the same error, it was an IAM issue - check the list here https://cloudprovider.dask.org/en/latest/_modules/dask_cloudprovider/aws/ecs.html#FargateCluster
latest
is for Prefect 1.0 so you should be using this image
prefecthq/prefect:2.0b6
I'd recommend: 1. cross-checking IAM 2. or signing up for a trial AWS account (your personal one) and try with an admin role there to check this 3. and if both options above don't work, it makes sense to open a GitHub issue - it looks like you investigated it already in quite a depth when you say "I'm 99% sure that the default image is not able to deserialize the tasks and therefore crashes." - it would be great to write it up into a GitHub issue - I could transcribe this conversation as a reference into the same issue if you send a link
t
Yep, the worker logs:
2022-06-16 11:40:49,164 - distributed.worker - ERROR - Could not deserialize task say_goodbye-18d6ba96-0-8c61c69add414402a87922bf3e637526
. I created a clean new IAM user with admin permissions on a separate AWS account, ran it, and unfortunately could not resolve it. Will open a GH issue and update this thread once it's created. Thanks a lot for your support! I have never ever seen such great support and engagement from any company. You and Kevin were highly praised during our Prefect Berlin Meetup earlier this week 😄
❤️ 1
🙌 1
a
thank you so much! Let me create an issue directly here to make it easier and you can add extra details into the issue as comments: @Marvin open "`DaskTaskRunner` cannot deserialize tasks when running on ECS Dask Cloud Provider - RuntimeError: IOLoop is closed"
t
Oh, sure, thanks! I was already writing the bug report based on your ticket template. I posted it as a comment. If this makes sense for you folks we can leave it as is. If it is easier to read and understand when we only have the description based on your template I can create a new issue. What do you prefer?
🙌 1
a
your comment looks great, let's keep it as is 👍
(both comments look great)
gratitude thank you 1