I’m seeing linear memory increase of a flow (until...
# ask-community
s
I’m seeing linear memory increase of a flow (until the container where my task is running is killed with an
OutOfMemoryError
.
I’m using the
ECSRun
run configuration configured via
Copy code
definition = {
            "networkMode": "awsvpc",
            "cpu": 4096,
            "memory": 30720,
            "containerDefinitions": [{"name": "flow"}],
            "executionRoleArn": cluster.execution_role_arn,
        }
        run_config = ECSRun(
            image=cluster.worker_image,
            labels=[recipe_bakery.id],
            task_definition=definition,
            run_task_kwargs={
                "tags": [
                    {"key": "Project", "value": "pangeo-forge"},
                    {"key": "Recipe", "value": recipe_name},
                ]
            },
        )
I’m using the
DaskExecutor
and after some fits and starts on memory configuration, my scheduler and worker are executing correctly until the flow run container fails.
I’m guessing there will be some questions about how our Flow and Tasks are configured. In the context of this project we are using an external library to translate some generic code into Tasks and Flows https://github.com/pangeo-data/rechunker/blob/master/rechunker/executors/prefect.py
Are there any recommendations or suggestions on how I could potentially diagnose this increasing memory usage in the task run container? Again, memory usage is growing linearly with the number of “Expected Runs” in a mapped task to the point of failure.
z
Hey @Sean Harkins -- generally this is because we hold all task results until the flow is complete, so if you are passing large amounts of data from task to task you can run out of memory. Our current recommendation is to pass references (ie a file path) to large data structures between tasks instead of the data itself where possible.
It sounds like your mapping over an input which is loading things from disk into memory to process them? In this case, you may want to limit the number of concurrent mapped tasks which can be done with "task tag" concurrency limits (Cloud only, Standard tier and above).
s
@Zanie I’m somewhat confused about the execution context here between the task run and the DaskExecutor. My understanding of the interaction was that internally via the DaskExecutor my mapped tasks would be converted to a Dask graph and the scheduler would then handle distribution of those tasks to workers (where this loading and processing would occur). In this case I would see the increase and release of memory in my workers as data is processed (which I am seeing). Is my understanding accurate?
z
Roughly. The flow runner actually walks the graph and submits the tasks to Dask for execution which handles distribution to workers but the flow runner holds the results of all the tasks until the graph is done executing.
s
@Zanie We had attempted to avoid the flow runner from maintaining result references by not returning from any of our tasks but I suspect we may be seeing an issue related to Checkpointing being automatically enabled for Prefect Cloud? Is there a way via the UI to determine if checkpointing is enabled for a flow (or individual task)?
I’m also curious why the selected task shown below is configured to use an S3Result when we are not explicitly returning a result (or configuring result storage).
z
Automatic checkpointing just means the return value from your tasks will be written to disk (or to your result type) and if you're not returning a result from your task it shouldn't have an effect.
Did you set a
flow.result
type?
s
No.
Also I realize that this generic Flow creation is a bit convoluted. The developers on this project want to support multiple execution backends so that scientists can develop and test workflows without involving Prefect. In this case they use a base class which has a
to_pipelines
method https://github.com/pangeo-forge/pangeo-forge-recipes/blob/master/pangeo_forge_recipes/recipes/base.py#L78. The “stages” in this pipeline are methods on a subclass https://github.com/pangeo-forge/pangeo-forge-recipes/blob/master/pangeo_forge_recipes/recipes/xarray_zarr.py none of which have a return type. These stages are converted to Tasks by this
make_flow
method https://github.com/pangeo-data/rechunker/blob/master/rechunker/executors/prefect.py#L42. Apologies for the labyrinth of code (I didn’t design this and it will most likely be drastically simplified soon). I can provide more details of the output flows if that is helpful.
z
I'm not sure why the
S3Result
is being set for that task but we only default to
LocalResult
as far as I know. I think you can check if checkpointing is enabled by adding a log to your task to display the value of
prefect.context.get("checkpointing")
. You should be able to also see it as an environment variable in the ECS task run.
(I do not think checkpointing would have an effect on memory consumption though)
s
It doesn’t appear to be an environment variable on the ECSRun task.
Is there someway to determine why a task is returning a result? Looking at the serialized output of each of the tasks I don’t see a result specified in any of them
Copy code
{'max_retries': 0, 'timeout': None, 'inputs': {'key': {'required': True, 'type': 'typing.Any'}}, 'cache_validator': {'fn': 'prefect.engine.cache_validators.never_use', 'kwargs': {}}, 'auto_generated': False, 'trigger': {'fn': 'prefect.triggers.all_successful', 'kwargs': {}}, 'skip_on_upstream_skip': True, 'slug': None, 'cache_for': None, 'retry_delay': None, 'outputs': 'typing.Any', 'type': 'rechunker.executors.prefect.MappedTaskWrapper', 'cache_key': None, 'name': 'MappedTaskWrapper', 'tags': [], '__version__': '0.14.19'}
{'max_retries': 0, 'timeout': None, 'inputs': {}, 'cache_validator': {'fn': 'prefect.engine.cache_validators.never_use', 'kwargs': {}}, 'auto_generated': False, 'trigger': {'fn': 'prefect.triggers.all_successful', 'kwargs': {}}, 'skip_on_upstream_skip': True, 'slug': None, 'cache_for': None, 'retry_delay': None, 'outputs': 'typing.Any', 'type': 'rechunker.executors.prefect.SingleTaskWrapper', 'cache_key': None, 'name': 'SingleTaskWrapper', 'tags': [], '__version__': '0.14.19'}
{'max_retries': 0, 'timeout': None, 'inputs': {}, 'cache_validator': {'fn': 'prefect.engine.cache_validators.never_use', 'kwargs': {}}, 'auto_generated': True, 'trigger': {'fn': 'prefect.triggers.all_successful', 'kwargs': {}}, 'skip_on_upstream_skip': True, 'slug': None, 'cache_for': None, 'retry_delay': None, 'outputs': 'typing.Any', 'type': 'prefect.tasks.core.constants.Constant', 'cache_key': None, 'name': 'Constant[list]', 'tags': [], '__version__': '0.14.19'}
{'max_retries': 0, 'timeout': None, 'inputs': {}, 'cache_validator': {'fn': 'prefect.engine.cache_validators.never_use', 'kwargs': {}}, 'auto_generated': True, 'trigger': {'fn': 'prefect.triggers.all_successful', 'kwargs': {}}, 'skip_on_upstream_skip': True, 'slug': None, 'cache_for': None, 'retry_delay': None, 'outputs': 'typing.Any', 'type': 'prefect.tasks.core.constants.Constant', 'cache_key': None, 'name': 'Constant[list]', 'tags': [], '__version__': '0.14.19'}
{'max_retries': 0, 'timeout': None, 'inputs': {'key': {'required': True, 'type': 'typing.Any'}}, 'cache_validator': {'fn': 'prefect.engine.cache_validators.never_use', 'kwargs': {}}, 'auto_generated': False, 'trigger': {'fn': 'prefect.triggers.all_successful', 'kwargs': {}}, 'skip_on_upstream_skip': True, 'slug': None, 'cache_for': None, 'retry_delay': None, 'outputs': 'typing.Any', 'type': 'rechunker.executors.prefect.MappedTaskWrapper', 'cache_key': None, 'name': 'MappedTaskWrapper', 'tags': [], '__version__': '0.14.19'}
{'max_retries': 0, 'timeout': None, 'inputs': {}, 'cache_validator': {'fn': 'prefect.engine.cache_validators.never_use', 'kwargs': {}}, 'auto_generated': False, 'trigger': {'fn': 'prefect.triggers.all_successful', 'kwargs': {}}, 'skip_on_upstream_skip': True, 'slug': None, 'cache_for': None, 'retry_delay': None, 'outputs': 'typing.Any', 'type': 'rechunker.executors.prefect.SingleTaskWrapper', 'cache_key': None, 'name': 'SingleTaskWrapper', 'tags': [], '__version__': '0.14.19'}
z
I'm not sure what the best way to determine that is yet; checking in with the rest of the team.
Is it possible you're encountering dask workers holding onto the memory? ie https://stackoverflow.com/questions/63680134/growing-memory-usage-leak-in-dask-distributed-profiler
s
I don’t think this is the issue. The profiler would be running on the dask scheduler container rather than the container where the ECSRun task is being executed.
z
That makes sense.
s
I think I misstated that a bit. We would see the profiler memory usage on both the
scheduler
and the
worker
containers.
But I don’t believe on the task run container though I don’t know about the dask client internals 😆
z
So just to clarify your setup, you have the "Flow" container which is handling submitting to a
DaskExecutor
which is connecting to a dask scheduler on another "Dask" container which is also running the dask workers? Your tasks do not return large datastructures and are instead returning small amounts of data to the "Flow" container but the "Flow" container is running out of memory while the "Dask" container is doing fine?
Also, got some clarification from the team. The
S3Result
is the default result type when your flow has
S3
storage.
Could you show an example of what your tasks are returning? Or are you literally using an empty
return
in your tasks?
s
Close. From top to bottom. 1. The flow execution container created by
ECSRun
. Which uses the DaskExecutor to submit jobs to … 2. The Dask Scheduler which runs on another container which distributes work to workers … 3. These workers are running on other containers which the Dask scheduler spawns via adaptive scaling. I was seeing some memory issues on the Dask side but that was caused by having too small an initial worker pool but now that I have a minimum number of workers, things seem alright on the Dask side. But memory usage steadily grows (very rapidly) on the ECSRun task container .
z
Alright, that's helpful.
Do you know how many tasks you have in your flow?
Also, since your tasks are automatically receiving an
S3Result
type, they will be writing their return values to your S3 bucket. Could you take a look there and see if any of the results are of significant size? (I think
None
results are actually not written to prevent making a bunch of empty files, just fyi)
s
Is there a way to to alter the default result type to nothing?
z
You can set
PREFECT__FLOWS__CHECKPOINTING=False
in the environment for your run but this should not have any effect on the flow container.
s
For this flow the outputs to S3 for the failed flow runs are 2 files, one of 70 KB and another of 3.3 KB.
z
And your 14,000 mapped items are all returning
None
?
And I'm correctly seeing that your flow container has been allocated 32GB of memory?
Is there the possibility of their use of a wrapping decorator is causing an issue.
And what would be the best approach for me to determine what on the stack is consuming memory on the flow runner task?
z
Yeah they do look to be returning
None
- I'm a bit confused by the class-with-closure stuff that's going on there and I can't really dig into pangeo's implementation details. The flow container should just be holding on to the return values from tasks and the states of the tasks which should fit easily into 32GB of memory with what you've outlined. If you can isolate where the memory consumption is coming from I can dig in some more and help find a solution but I'm reaching the extent of what I can do from here.
As far as isolating memory consumption, I'm not sure what the best way is (but I'll ask if anyone has suggestions). Running your flow locally while still using your dask cluster to execute the work seems like a good first step.
s
@Zanie I have my agent running in a local docker container and again, while running locally I see the agent process killed by an OOM error. Do you or someone at Prefect have suggestions on how I should introspect the agent process on the container to investigate why we are seeing this high memory usage?
z
I would suggest using flow.run instead of the agent so you don't have to deal with subprocesses. I would then annotate some of the functions in the flow and task runners with a python memory profiler and run the flow to see what it reports.
s
👍 Sounds like a plan. Thanks.
z
I can help with specifics after the weekend
s
Are you off on Monday?
z
Indeed
s
👍 I’ll try and touch base with you on Tuesday with some more information.
z
Sorry there's not a quick way to do this. Memory management in python is always fickle.
Sounds good! Thanks for being understanding :)
s
No worries. Thanks for the assistance.
@Zanie I did some quick
tracemalloc
instrumentation around
flow.run()
using
LocalRun
with a
DaskExecutor
pointing to my cluster Dask cluster running on Fargate. On the local container where
flow.run()
is executing I am seeing a rapid and steady increase in memory utilization prior to the OOM crash.
Copy code
INFO:root:/srv/conda/envs/notebook/lib/python3.8/site-packages/msgpack/__init__.py:0: size=66.9 MiB, count=6, average=11.2 MiB
INFO:root:/srv/conda/envs/notebook/lib/python3.8/site-packages/msgpack/__init__.py:0: size=1156 MiB, count=120, average=9868 KiB
INFO:root:/srv/conda/envs/notebook/lib/python3.8/site-packages/msgpack/__init__.py:0: size=2408 MiB, count=251, average=9826 KiB
INFO:root:/srv/conda/envs/notebook/lib/python3.8/site-packages/msgpack/__init__.py:0: size=3603 MiB, count=377, average=9787 KiB
Note that the Dask scheduler and workers continued to operate normally after the task run container crashed.
And a trace for the allocation up until it is killed by OOM for the container
Copy code
INFO:root:27 memory blocks: 1047174.9 KiB
INFO:root:  File "/srv/conda/envs/notebook/lib/python3.8/threading.py", line 890
INFO:root:    self._bootstrap_inner()
INFO:root:  File "/srv/conda/envs/notebook/lib/python3.8/threading.py", line 932
INFO:root:    self.run()
INFO:root:  File "/srv/conda/envs/notebook/lib/python3.8/threading.py", line 870
INFO:root:    self._target(*self._args, **self._kwargs)
INFO:root:  File "/srv/conda/envs/notebook/lib/python3.8/concurrent/futures/thread.py", line 80
INFO:root:    work_item.run()
INFO:root:  File "/srv/conda/envs/notebook/lib/python3.8/concurrent/futures/thread.py", line 57
INFO:root:    result = self.fn(*self.args, **self.kwargs)
INFO:root:  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/utils.py", line 1468
INFO:root:    return await loop.run_in_executor(_offload_executor, lambda: fn(*args, **kwargs))
INFO:root:  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/comm/utils.py", line 31
INFO:root:    protocol.dumps(
INFO:root:  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/protocol/core.py", line 70
INFO:root:    frames[0] = msgpack.dumps(msg, default=_encode_default, use_bin_type=True)
INFO:root:  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/msgpack/__init__.py", line 35
INFO:root:    return Packer(**kwargs).pack(o)
Killed
z
Hey @Sean Harkins -- I've written up a quick branch that adds memory profiling. You can install it with
pip install prefect@git+<https://github.com/PrefectHQ/prefect@3eec97aa4182d93ac02f426abcd34b2780726d7e#egg=prefect>
then run your flow with
python -m memory_profiler flow.py
You can see what you're installing here: https://github.com/PrefectHQ/prefect/compare/debug-mem
s
Thanks @Zanie I’ll try to test later today using this profiler. One question is exactly how a Flow’s tasks are submitted by the `DaskExecutor`’s
submit
method? I’m tracking how this may be related to an upstream issue on our task’s underlying class can be serialized but as Prefect does not actually construct a task graph I’d like to try and better understand how Tasks are sent to the dask scheduler. Can you point me to this code in the Prefect repo?
z
s
@Zanie We’ve been able to somewhat isolate the OOM errors we were seeing to what we think is a serialization issue with the flow. We’ll run some more tests today with an isolated portion of the flow to try and get a better handle on memory utilization. With our isolated Flow we are still seeing a slow steady memory increase / leak on our workers so I have a few questions related to this. 1. How can I include memory profiling for a task? Adding the
@profile
decorator results in
ValueError: Tasks with variable positional arguments (*args) are not supported,
on registration. And when I extract the task’s body into another function and use the
@profile
decorator there the logs just report
ERROR: Could not find file flow.py
. What is the best approach for instrumenting tasks for memory profiling?
z
Ah that's a tricky one; I don't think you'll be able to use that decorator. I think generally I'd look at the dask dashboard to see task level memory usage. What was the serialization issue?
s
I’m still trying to determine, but the class used to create the flow is quite complex https://github.com/pangeo-forge/pangeo-forge-recipes/blob/master/pangeo_forge_recipes/recipes/xarray_zarr.py so it is difficult to diagnose. As for the task, I already know that dask task is leaking memory, I need information about what in the task leaking, so a way to see the stack trace for the highest memory allocation on the worker in the logs. Is there any approach for this. Normally, I would just directly decorate my dask task code, but this is not an option with Prefect.
One other semi-related question. To simplify the debugging here I mad a minimal Flow that only does our initial download to S3 step. I’m trying to understand how the difference in how the parameter being mapped over affects the dask execution. In this case
Copy code
@task
def source_url(day: str) -> str:
    day = pd.Timestamp(day)
    source_url_pattern = (
        "<https://www.ncei.noaa.gov/data/>"
        "sea-surface-temperature-optimum-interpolation/v2.1/access/avhrr/"
        "{day:%Y%m}/oisst-avhrr-v02r01.{day:%Y%m%d}.nc"
    )
    return source_url_pattern.format(day=day)
@task
def download(source_url, cache_location):
    target_url = os.path.join(cache_location, str(hash(source_url)))

    try:
        fsspec.open(target_url).open()
        return target_url
    except FileNotFoundError:
        pass

    with fsspec.open(source_url, mode="rb") as source:
        with fsspec.open(target_url, mode="wb") as target:
            target.write(source.read())
    return target_url


with Flow(
    name,
    storage=storage.S3(bucket="pangeo-forge-aws-bakery-flowstoragebucketpangeof-71w6gsnambj9"),
    run_config=run_config,
    executor=dask_executor,
) as flow:
    days = Parameter(
        "days",
        default=pd.date_range("1981-09-01", "2021-01-05", freq="D").strftime("%Y-%m-%d").tolist(),
    )
    sources = source_url.map(days)
    nc_sources = download.map(
        sources,
        cache_location=unmapped(f"<s3://pangeo-forge-aws-bakery-flowcachebucketpangeofor-196cpck7y0pbl/{name}/cache>"),
    )
I would expect to see the mapped download task executing in parallel across all of the workers. Instead I’m seeing tasks submitted and executed serially (Note the single task processing rather than all 5 workers being utilized)
But if I map over a Constant
list
like this
Copy code
def source_url(day: str) -> str:
    day = pd.Timestamp(day)
    source_url_pattern = (
        "<https://www.ncei.noaa.gov/data/>"
        "sea-surface-temperature-optimum-interpolation/v2.1/access/avhrr/"
        "{day:%Y%m}/oisst-avhrr-v02r01.{day:%Y%m%d}.nc"
    )
    return source_url_pattern.format(day=day)

@task
def download(source_url, cache_location):
    target_url = os.path.join(cache_location, str(hash(source_url)))

    try:
        fsspec.open(target_url).open()
        return target_url
    except FileNotFoundError:
        pass

    with fsspec.open(source_url, mode="rb") as source:
        with fsspec.open(target_url, mode="wb") as target:
            target.write(source.read())
    return target_url


with Flow(
    name,
    storage=storage.S3(bucket="pangeo-forge-aws-bakery-flowstoragebucketpangeof-71w6gsnambj9"),
    run_config=run_config,
    executor=dask_executor,
) as flow:
    days = Parameter(
        "days",
        default=pd.date_range("1981-09-01", "2021-01-05", freq="D").strftime("%Y-%m-%d").tolist(),
    )
    sources = list(map(source_url, pd.date_range("1981-09-01", "2021-01-05", freq="D").strftime("%Y-%m-%d").to_list()))
    nc_sources = download.map(
        sources,
        cache_location=unmapped(f"<s3://pangeo-forge-aws-bakery-flowcachebucketpangeofor-196cpck7y0pbl/{name}/cache>"),
    )
Where
source_url
is a vanilla function rather than a
task
I see full parallel worker utilization like I would have expected in the other example
Can you explain why the behavior of these is different for the dask scheduler?
z
This might be Dask's data localization feature, ie the Parameter task is scheduled one worker and returns a list then each of the subsequent tasks are scheduled on the worker with that data around already, but I'm honestly not sure. I've reached out to the rest of the team to see if they have any ideas.
Does this persist if you use a @task function to return your list rather than a parameter?
What would you be decorating your tasks with if you were just using dask?
s
@Zanie I was able to use some
tracemalloc
statements directly in my task to track things rather than using the
memory_profiler
.
z
Can you not use
tracemalloc.start()
directly in your task?
s
That is what I ended up doing.
z
Oh sorry I misread your message 🤦‍♂️
Glad that worked! I wonder if we should build that in.
s
@Zanie Here is the ticket we are using for tracking this https://github.com/pangeo-forge/pangeo-forge-recipes/issues/151