I'm running into some memory limits with a flow us...
# prefect-community
s
I'm running into some memory limits with a flow using an ECSRun config on a LocalCluster DaskExecutor. The first task in the flow is to generate a list of file keys, and then I map over that list to process the files. I would have expected that when an individual Mapped Child task completes, any memory used in that task would have been released (aside from the return value). But it seems like that is not the case, as the process eventually runs out of memory after several Mapped Children finish. Is there something I can do in the configuration of the flow or task to mitigate this? As an aside, when I run this same flow using a FargateCluster, it will complete without issue. I'm not sure if that's because the FargateCluster I'm using just has more memory, or if there's something inherently different between the two different DaskExecutor classes. Thanks as always for any help you can provide.
a
can you share your ECSRun run config?
s
Copy code
ECSRun(
    image=ECR_IMAGE,
    execution_role_arn=f'arn:aws:iam::{account}:role/prefect-ecs-execution-role',
    task_role_arn=f'arn:aws:iam::{account}:role/prefect-ecs-task-role',
    run_task_kwargs={
        "cluster": "PrefectCluster",
    },
    labels=[account]
)
a
and now your executor? 🙂 how do you attach the executor to your flow?
s
And using a task definition of:
Copy code
networkMode: awsvpc
cpu: 1024
memory: 2048
containerDefinitions:
  - name: flow
    image: not_a_real_image
    logConfiguration:
      logDriver: awslogs
      options:
        awslogs-group: prefect_ecs_task_log_ACCOUNT
        awslogs-region: us-east-1
        awslogs-stream-prefix: ecs-prefect
        awslogs-create-group: "True"
Exceutor is:
Copy code
def dynamic_executor():
    if prefect.context.parameters["use_fargate"] == 'true':
        return FargateCluster(
            image= ECR_IMAGE, 
            n_workers= 5, # Must specify n_workers
            cluster_arn= fr"arn:aws:ecs:us-east-1:{account}:cluster/PrefectCluster",
            task_role_arn= fr"arn:aws:iam::{account}:role/prefect-ecs-task-role",
            vpc= vpc,
            subnets= [subnet],
            security_groups= [sg],
            cloudwatch_logs_group=f'prefect_ecs_task_log_{account}'
        )
    else:
        return LocalCluster()

flow1.executor = DaskExecutor(
    cluster_class=dynamic_executor
)
a
in order for the executor to take effect, it must be properly set in your Flow because it's read from storage. It's best to attach it to the Flow directly like so:
Copy code
with Flow("yourflow", executor=DaskExecutor(
    cluster_class=dynamic_executor
)) as flow:
s
I thought
Copy code
with Flow('flow', executor=myExecutor) as f:
and
Copy code
with Flow('flow') as f: 
    <tasks>
f.executor=myExecutor
are functionally equivalent?
k
They are but if you do:
Copy code
from some_other_file import flow1

flow1.storage = ..
flow1.executor = ..
flow1.register()
the executor is not stored because it’s pulled from the Flow definition in storage. Is it in the same file for you?
👍 1
a
They are, but sometimes people attach it too late and it must be available on the flow object in order for it to be used. E.g. a common antipattern is:
Copy code
with Flow('flow') as f: 
    <tasks>
if __name__ == "__main__":
    f.executor=DaskExecutor()
and main is not evaluated when flow is already registered. a likely reason why your flow runs out of memory when using a local dask cluster is that it only has 1 CPU and 2 GB of memory so it sounds reasonable that in this setting your flow run can get OOM errors
s
Yes, everything is in the same file, and in the same scope.
I get that increasing the memory for the memory would help, but I am using relatively small batches. I was expecting that each of the mapped tasks would only use the memory needed for what they return. Is there some way I can tell Prefect/Dask to dump the memory after the task is done?
k
How many tasks are you mapping over? We normally see issues around maybe the 50k mapped to 100k mapped task scale
s
50ish
Each task handles 500 small files
k
Ah ok. Have I shown you

this

video to handle unmanaged memory?
s
You have not
I will take a look
k
I dunno. May or may not help release that memory
s
Alright, I'll give a couple of things a shot.
Unfortunately the jobs take a while, so I won't know if it helped for a couple of hours.
Thanks as always!
k
You added the env variable?
s
I'm going to try to do that
I'm adding it to my default task definition:
Copy code
networkMode: awsvpc
cpu: 1024
memory: 2048
containerDefinitions:
  - name: flow
    environment: 
      - name:  MALLOC_TRIM_THRESHOLD_
        value: 65536 
    image: not_a_real_image
    logConfiguration:
      logDriver: awslogs
      options:
        awslogs-group: prefect_ecs_task_log_ACCOUNT
        awslogs-region: us-east-1
        awslogs-stream-prefix: ecs-prefect
        awslogs-create-group: "True"
Does that seem like the right way to do it?
k
Looks right to me
s
Alright, I'll let you know if that improves things
a
Interesting! I have run into a similar issue recently, and thought I was losing my mind. I’m using a local DaskCluster, and 64 threads are handling 100-1000 files worth 10-500 kB each — but the tasks are long-running and RAM usage grows relatively quickly (tens of minutes to an hour) to an insane 2-4GiB. I have been debugging this for several days now. Looking at the Dask dashboard, it’s clear that the amount of unmanaged memory keeps growing very fast, and doesn’t reset or go down when some tasks are done as you’d expect. If you set a memory limit on the worker, it doesn’t spill anything to disk and just crashes/restarts. The MALLOC_TRIM_THRESHOLD_ option is not available on (my) MacOS so it would be good to know if it works in your case. Official documentation for reference; https://distributed.dask.org/en/stable/worker-memory.html#automatically-trim-memory
266 Views