Toby Rahloff

    Toby Rahloff

    3 months ago
    Heyho 🐙 We tested out the Dask provider and think that Prefect version 2.0b6 maybe creates a Dask cluster that has a version mismatch between client and server when using the "aws.FargateCluster" cloudprovider. Steps to reproduce can be found in the thread 🙂
    Anna Geller

    Anna Geller

    3 months ago
    Can you please move the code blocks and details into the thread and only include the main problem in your main message? Thanks so much!
    Toby Rahloff

    Toby Rahloff

    3 months ago
    Steps to reproduce: Use the Prefect 2.0 "getting started" for Dask code and adapt the DaskTaskRunner to use Fargate (documentation). Full code below:
    from prefect import task, flow
    from prefect.task_runners import DaskTaskRunner
    
    fargate_dask_runner = DaskTaskRunner(
        cluster_class="dask_cloudprovider.aws.FargateCluster",
        adapt_kwargs={
            "maximum": 100,
        }
    )
    
    
    @task
    def say_hello(name):
        print(f"hello {name}")
    
    
    @task
    def say_goodbye(name):
        print(f"goodbye {name}")
    
    
    @flow(task_runner=fargate_dask_runner)
    def greetings(names):
        for name in names:
            say_hello(name)
            say_goodbye(name)
    
    
    if __name__ == "__main__":
        greetings(["arthur", "trillian", "ford", "marvin"])
    Then install the dependencies:
    pip install prefect==2.0b6 "dask-cloudprovider[aws]" dask
    When running the script I run into the following error:
    11:58:52.731 | INFO    | prefect.engine - Created flow run 'knowing-zebu' for flow 'greetings'
    11:58:52.732 | INFO    | Flow run 'knowing-zebu' - Using task runner 'DaskTaskRunner'
    11:58:52.734 | INFO    | prefect.task_runner.dask - Creating a new Dask cluster with `dask_cloudprovider.aws.ecs.FargateCluster`
    /home/tobias/anaconda3/envs/test-dask/lib/python3.7/contextlib.py:119: UserWarning: Creating your cluster is taking a surprisingly long time. This is likely due to pending resources on AWS. Hang tight! 
      next(self.gen)
    distributed.deploy.adaptive - INFO - Adaptive scaling started: minimum=0 maximum=100
    /home/tobias/anaconda3/envs/test-dask/lib/python3.7/site-packages/distributed/client.py:1265: VersionMismatchWarning: Mismatched versions found
    
    +-------------+-----------+-----------+---------+
    | Package     | client    | scheduler | workers |
    +-------------+-----------+-----------+---------+
    | dask        | 2022.02.0 | 2022.6.0  | None    |
    | distributed | 2022.02.0 | 2022.6.0  | None    |
    | lz4         | 4.0.1     | 4.0.0     | None    |
    +-------------+-----------+-----------+---------+
      warnings.warn(version_module.VersionMismatchWarning(msg[0]["warning"]))
    12:00:23.378 | INFO    | prefect.task_runner.dask - The Dask dashboard is available at <http://34.240.46.11:8787/status>
    12:00:24.381 | INFO    | Flow run 'knowing-zebu' - Created task run 'say_hello-d71d0552-0' for task 'say_hello'
    12:00:24.845 | INFO    | Flow run 'knowing-zebu' - Created task run 'say_goodbye-18d6ba96-0' for task 'say_goodbye'
    12:00:25.192 | INFO    | Flow run 'knowing-zebu' - Created task run 'say_hello-d71d0552-1' for task 'say_hello'
    12:00:25.470 | INFO    | Flow run 'knowing-zebu' - Created task run 'say_goodbye-18d6ba96-1' for task 'say_goodbye'
    12:00:25.723 | INFO    | Flow run 'knowing-zebu' - Created task run 'say_hello-d71d0552-2' for task 'say_hello'
    12:00:25.969 | INFO    | Flow run 'knowing-zebu' - Created task run 'say_goodbye-18d6ba96-2' for task 'say_goodbye'
    12:00:26.229 | INFO    | Flow run 'knowing-zebu' - Created task run 'say_hello-d71d0552-3' for task 'say_hello'
    12:00:26.610 | INFO    | Flow run 'knowing-zebu' - Created task run 'say_goodbye-18d6ba96-3' for task 'say_goodbye'
    pip instal12:01:02.654 | INFO    | Task run 'say_hello-d71d0552-0' - Crash detected! Execution was interrupted by an unexpected exception.
    l 12:01:03.030 | INFO    | Task run 'say_goodbye-18d6ba96-0' - Crash detected! Execution was interrupted by an unexpected exception.
    12:01:03.395 | INFO    | Task run 'say_hello-d71d0552-1' - Crash detected! Execution was interrupted by an unexpected exception.
    12:01:03.686 | INFO    | Task run 'say_goodbye-18d6ba96-1' - Crash detected! Execution was interrupted by an unexpected exception.
    12:01:03.907 | INFO    | Task run 'say_hello-d71d0552-2' - Crash detected! Execution was interrupted by an unexpected exception.
    12:01:04.138 | INFO    | Task run 'say_goodbye-18d6ba96-2' - Crash detected! Execution was interrupted by an unexpected exception.
    12:01:04.393 | INFO    | Task run 'say_hello-d71d0552-3' - Crash detected! Execution was interrupted by an unexpected exception.
    12:01:04.630 | INFO    | Task run 'say_goodbye-18d6ba96-3' - Crash detected! Execution was interrupted by an unexpected exception.
    ^Cdistributed.deploy.adaptive_core - INFO - Adaptive stop
    12:01:06.126 | ERROR   | Flow run 'knowing-zebu' - Crash detected! Execution was cancelled by the runtime environment.
    12:01:06.563 | ERROR   | asyncio - unhandled exception during asyncio.run() shutdown
    [...]
    RuntimeError: IOLoop is closed
    It seems like the client/scheduler version is mismatched. I tried to match the v2022.6.0 that is printed for the scheduler on the client by
    pip install dask==2022.6.0
    but the weird part is, there is no version 2022.6.0 available. Has anyone got an idea of how to fix this? Or another debugging approach to circle in on the root cause? I'm not quite sure whether this is a bug in Prefect, a bug in Dask (/Dask-Cloudprovider), or if I'm missing something important and the issue is with our implementation. Thanks a lot! 🙂
    Anna Geller

    Anna Geller

    3 months ago
    Fantastic write-up, thank you so much! The version mismatch is not too cool, but in the end, this is "just" a warning. The actual error you are getting is due to missing permissions - check this issue for more background on this error - even though it was for Prefect 1.0, the root cause of the error seems to be the same: insufficient IAM permissions Can you try temporarily attaching admin rights to your task role?
    If permissions are not helping here, LMK - we can open an issue to investigate in more depth. However, at this time, I'm not sure whether the issue would be about permissioning of IAM roles (RuntimeError: IOLoop is closed) or about the version mismatch, or both
    Toby Rahloff

    Toby Rahloff

    3 months ago
    That totally sounds plausible - thanks a lot! The permissions should not be an issue as far as I know. I'm not able to go with admin credentials but I have "nearly admin" credentials (cannot change organization and billing). Should be plenty, right? The image issue sounds interesting. By default, it starts with "daskdev/dask:latest". After some tests, I'm 99% sure that the default image is not able to deserialize the tasks and therefore crashes. Is there an official image I can use for Prefect 2.0 or is there any documentation on how to build it?
    quick update: I tried
    prefecthq/prefect:2.0b6
    and
    prefecthq/prefect:latest
    . Did not get it to work unfortunately
    Anna Geller

    Anna Geller

    3 months ago
    The permissions should not be an issue as far as I know. I'm not able to go with admin credentials but I have "nearly admin" credentials (cannot change organization and billing). Should be plenty, right?
    When faced with exactly the same error, it was an IAM issue - check the list here https://cloudprovider.dask.org/en/latest/_modules/dask_cloudprovider/aws/ecs.html#FargateCluster
    latest
    is for Prefect 1.0 so you should be using this image
    prefecthq/prefect:2.0b6
    I'd recommend:1. cross-checking IAM 2. or signing up for a trial AWS account (your personal one) and try with an admin role there to check this 3. and if both options above don't work, it makes sense to open a GitHub issue - it looks like you investigated it already in quite a depth when you say "I'm 99% sure that the default image is not able to deserialize the tasks and therefore crashes." - it would be great to write it up into a GitHub issue - I could transcribe this conversation as a reference into the same issue if you send a link
    Toby Rahloff

    Toby Rahloff

    3 months ago
    Yep, the worker logs:
    2022-06-16 11:40:49,164 - distributed.worker - ERROR - Could not deserialize task say_goodbye-18d6ba96-0-8c61c69add414402a87922bf3e637526
    . I created a clean new IAM user with admin permissions on a separate AWS account, ran it, and unfortunately could not resolve it. Will open a GH issue and update this thread once it's created. Thanks a lot for your support! I have never ever seen such great support and engagement from any company. You and Kevin were highly praised during our Prefect Berlin Meetup earlier this week 😄
    Anna Geller

    Anna Geller

    3 months ago
    thank you so much! Let me create an issue directly here to make it easier and you can add extra details into the issue as comments: @Marvin open "DaskTaskRunner cannot deserialize tasks when running on ECS Dask Cloud Provider - RuntimeError: IOLoop is closed"
    Marvin

    Marvin

    3 months ago
    Toby Rahloff

    Toby Rahloff

    3 months ago
    Oh, sure, thanks! I was already writing the bug report based on your ticket template. I posted it as a comment. If this makes sense for you folks we can leave it as is. If it is easier to read and understand when we only have the description based on your template I can create a new issue. What do you prefer?
    Anna Geller

    Anna Geller

    3 months ago
    your comment looks great, let's keep it as is 👍
    (both comments look great)