s

    Scott Aefsky

    5 months ago
    I'm running into some memory limits with a flow using an ECSRun config on a LocalCluster DaskExecutor. The first task in the flow is to generate a list of file keys, and then I map over that list to process the files. I would have expected that when an individual Mapped Child task completes, any memory used in that task would have been released (aside from the return value). But it seems like that is not the case, as the process eventually runs out of memory after several Mapped Children finish. Is there something I can do in the configuration of the flow or task to mitigate this? As an aside, when I run this same flow using a FargateCluster, it will complete without issue. I'm not sure if that's because the FargateCluster I'm using just has more memory, or if there's something inherently different between the two different DaskExecutor classes. Thanks as always for any help you can provide.
    Anna Geller

    Anna Geller

    5 months ago
    can you share your ECSRun run config?
    s

    Scott Aefsky

    5 months ago
    ECSRun(
        image=ECR_IMAGE,
        execution_role_arn=f'arn:aws:iam::{account}:role/prefect-ecs-execution-role',
        task_role_arn=f'arn:aws:iam::{account}:role/prefect-ecs-task-role',
        run_task_kwargs={
            "cluster": "PrefectCluster",
        },
        labels=[account]
    )
    Anna Geller

    Anna Geller

    5 months ago
    and now your executor? 🙂 how do you attach the executor to your flow?
    s

    Scott Aefsky

    5 months ago
    And using a task definition of:
    networkMode: awsvpc
    cpu: 1024
    memory: 2048
    containerDefinitions:
      - name: flow
        image: not_a_real_image
        logConfiguration:
          logDriver: awslogs
          options:
            awslogs-group: prefect_ecs_task_log_ACCOUNT
            awslogs-region: us-east-1
            awslogs-stream-prefix: ecs-prefect
            awslogs-create-group: "True"
    Exceutor is:
    def dynamic_executor():
        if prefect.context.parameters["use_fargate"] == 'true':
            return FargateCluster(
                image= ECR_IMAGE, 
                n_workers= 5, # Must specify n_workers
                cluster_arn= fr"arn:aws:ecs:us-east-1:{account}:cluster/PrefectCluster",
                task_role_arn= fr"arn:aws:iam::{account}:role/prefect-ecs-task-role",
                vpc= vpc,
                subnets= [subnet],
                security_groups= [sg],
                cloudwatch_logs_group=f'prefect_ecs_task_log_{account}'
            )
        else:
            return LocalCluster()
    
    flow1.executor = DaskExecutor(
        cluster_class=dynamic_executor
    )
    Anna Geller

    Anna Geller

    5 months ago
    in order for the executor to take effect, it must be properly set in your Flow because it's read from storage. It's best to attach it to the Flow directly like so:
    with Flow("yourflow", executor=DaskExecutor(
        cluster_class=dynamic_executor
    )) as flow:
    s

    Scott Aefsky

    5 months ago
    I thought
    with Flow('flow', executor=myExecutor) as f:
    and
    with Flow('flow') as f: 
        <tasks>
    f.executor=myExecutor
    are functionally equivalent?
    Kevin Kho

    Kevin Kho

    5 months ago
    They are but if you do:
    from some_other_file import flow1
    
    flow1.storage = ..
    flow1.executor = ..
    flow1.register()
    the executor is not stored because it’s pulled from the Flow definition in storage. Is it in the same file for you?
    Anna Geller

    Anna Geller

    5 months ago
    They are, but sometimes people attach it too late and it must be available on the flow object in order for it to be used. E.g. a common antipattern is:
    with Flow('flow') as f: 
        <tasks>
    if __name__ == "__main__":
        f.executor=DaskExecutor()
    and main is not evaluated when flow is already registered. a likely reason why your flow runs out of memory when using a local dask cluster is that it only has 1 CPU and 2 GB of memory so it sounds reasonable that in this setting your flow run can get OOM errors
    s

    Scott Aefsky

    5 months ago
    Yes, everything is in the same file, and in the same scope.
    I get that increasing the memory for the memory would help, but I am using relatively small batches. I was expecting that each of the mapped tasks would only use the memory needed for what they return. Is there some way I can tell Prefect/Dask to dump the memory after the task is done?
    Kevin Kho

    Kevin Kho

    5 months ago
    How many tasks are you mapping over? We normally see issues around maybe the 50k mapped to 100k mapped task scale
    s

    Scott Aefsky

    5 months ago
    50ish
    Each task handles 500 small files
    Kevin Kho

    Kevin Kho

    5 months ago
    Ah ok. Have I shown you

    this

    video to handle unmanaged memory?
    s

    Scott Aefsky

    5 months ago
    You have not
    I will take a look
    Kevin Kho

    Kevin Kho

    5 months ago
    I dunno. May or may not help release that memory
    s

    Scott Aefsky

    5 months ago
    Alright, I'll give a couple of things a shot.
    Unfortunately the jobs take a while, so I won't know if it helped for a couple of hours.
    Thanks as always!
    Kevin Kho

    Kevin Kho

    5 months ago
    You added the env variable?
    s

    Scott Aefsky

    5 months ago
    I'm going to try to do that
    I'm adding it to my default task definition:
    networkMode: awsvpc
    cpu: 1024
    memory: 2048
    containerDefinitions:
      - name: flow
        environment: 
          - name:  MALLOC_TRIM_THRESHOLD_
            value: 65536 
        image: not_a_real_image
        logConfiguration:
          logDriver: awslogs
          options:
            awslogs-group: prefect_ecs_task_log_ACCOUNT
            awslogs-region: us-east-1
            awslogs-stream-prefix: ecs-prefect
            awslogs-create-group: "True"
    Does that seem like the right way to do it?
    Kevin Kho

    Kevin Kho

    5 months ago
    Looks right to me
    s

    Scott Aefsky

    5 months ago
    Alright, I'll let you know if that improves things
    Arseniy Aseev

    Arseniy Aseev

    5 months ago
    Interesting! I have run into a similar issue recently, and thought I was losing my mind. I’m using a local DaskCluster, and 64 threads are handling 100-1000 files worth 10-500 kB each — but the tasks are long-running and RAM usage grows relatively quickly (tens of minutes to an hour) to an insane 2-4GiB. I have been debugging this for several days now. Looking at the Dask dashboard, it’s clear that the amount of unmanaged memory keeps growing very fast, and doesn’t reset or go down when some tasks are done as you’d expect. If you set a memory limit on the worker, it doesn’t spill anything to disk and just crashes/restarts. The MALLOC_TRIM_THRESHOLD_ option is not available on (my) MacOS so it would be good to know if it works in your case. Official documentation for reference; https://distributed.dask.org/en/stable/worker-memory.html#automatically-trim-memory