Sean Harkins
05/28/2021, 4:25 PMOutOfMemoryError
.Sean Harkins
05/28/2021, 4:27 PMECSRun
run configuration configured via
definition = {
"networkMode": "awsvpc",
"cpu": 4096,
"memory": 30720,
"containerDefinitions": [{"name": "flow"}],
"executionRoleArn": cluster.execution_role_arn,
}
run_config = ECSRun(
image=cluster.worker_image,
labels=[recipe_bakery.id],
task_definition=definition,
run_task_kwargs={
"tags": [
{"key": "Project", "value": "pangeo-forge"},
{"key": "Recipe", "value": recipe_name},
]
},
)
Sean Harkins
05/28/2021, 4:29 PMDaskExecutor
and after some fits and starts on memory configuration, my scheduler and worker are executing correctly until the flow run container fails.Sean Harkins
05/28/2021, 4:30 PMSean Harkins
05/28/2021, 4:34 PMZanie
Zanie
Sean Harkins
05/28/2021, 4:47 PMZanie
Sean Harkins
05/28/2021, 5:30 PMSean Harkins
05/28/2021, 5:40 PMZanie
Zanie
flow.result
type?Sean Harkins
05/28/2021, 5:42 PMSean Harkins
05/28/2021, 5:43 PMSean Harkins
05/28/2021, 5:51 PMto_pipelines
method https://github.com/pangeo-forge/pangeo-forge-recipes/blob/master/pangeo_forge_recipes/recipes/base.py#L78. The “stages” in this pipeline are methods on a subclass https://github.com/pangeo-forge/pangeo-forge-recipes/blob/master/pangeo_forge_recipes/recipes/xarray_zarr.py none of which have a return type. These stages are converted to Tasks by this make_flow
method https://github.com/pangeo-data/rechunker/blob/master/rechunker/executors/prefect.py#L42. Apologies for the labyrinth of code (I didn’t design this and it will most likely be drastically simplified soon). I can provide more details of the output flows if that is helpful.Zanie
S3Result
is being set for that task but we only default to LocalResult
as far as I know.
I think you can check if checkpointing is enabled by adding a log to your task to display the value of prefect.context.get("checkpointing")
. You should be able to also see it as an environment variable in the ECS task run.Zanie
Sean Harkins
05/28/2021, 6:32 PMSean Harkins
05/28/2021, 6:40 PM{'max_retries': 0, 'timeout': None, 'inputs': {'key': {'required': True, 'type': 'typing.Any'}}, 'cache_validator': {'fn': 'prefect.engine.cache_validators.never_use', 'kwargs': {}}, 'auto_generated': False, 'trigger': {'fn': 'prefect.triggers.all_successful', 'kwargs': {}}, 'skip_on_upstream_skip': True, 'slug': None, 'cache_for': None, 'retry_delay': None, 'outputs': 'typing.Any', 'type': 'rechunker.executors.prefect.MappedTaskWrapper', 'cache_key': None, 'name': 'MappedTaskWrapper', 'tags': [], '__version__': '0.14.19'}
{'max_retries': 0, 'timeout': None, 'inputs': {}, 'cache_validator': {'fn': 'prefect.engine.cache_validators.never_use', 'kwargs': {}}, 'auto_generated': False, 'trigger': {'fn': 'prefect.triggers.all_successful', 'kwargs': {}}, 'skip_on_upstream_skip': True, 'slug': None, 'cache_for': None, 'retry_delay': None, 'outputs': 'typing.Any', 'type': 'rechunker.executors.prefect.SingleTaskWrapper', 'cache_key': None, 'name': 'SingleTaskWrapper', 'tags': [], '__version__': '0.14.19'}
{'max_retries': 0, 'timeout': None, 'inputs': {}, 'cache_validator': {'fn': 'prefect.engine.cache_validators.never_use', 'kwargs': {}}, 'auto_generated': True, 'trigger': {'fn': 'prefect.triggers.all_successful', 'kwargs': {}}, 'skip_on_upstream_skip': True, 'slug': None, 'cache_for': None, 'retry_delay': None, 'outputs': 'typing.Any', 'type': 'prefect.tasks.core.constants.Constant', 'cache_key': None, 'name': 'Constant[list]', 'tags': [], '__version__': '0.14.19'}
{'max_retries': 0, 'timeout': None, 'inputs': {}, 'cache_validator': {'fn': 'prefect.engine.cache_validators.never_use', 'kwargs': {}}, 'auto_generated': True, 'trigger': {'fn': 'prefect.triggers.all_successful', 'kwargs': {}}, 'skip_on_upstream_skip': True, 'slug': None, 'cache_for': None, 'retry_delay': None, 'outputs': 'typing.Any', 'type': 'prefect.tasks.core.constants.Constant', 'cache_key': None, 'name': 'Constant[list]', 'tags': [], '__version__': '0.14.19'}
{'max_retries': 0, 'timeout': None, 'inputs': {'key': {'required': True, 'type': 'typing.Any'}}, 'cache_validator': {'fn': 'prefect.engine.cache_validators.never_use', 'kwargs': {}}, 'auto_generated': False, 'trigger': {'fn': 'prefect.triggers.all_successful', 'kwargs': {}}, 'skip_on_upstream_skip': True, 'slug': None, 'cache_for': None, 'retry_delay': None, 'outputs': 'typing.Any', 'type': 'rechunker.executors.prefect.MappedTaskWrapper', 'cache_key': None, 'name': 'MappedTaskWrapper', 'tags': [], '__version__': '0.14.19'}
{'max_retries': 0, 'timeout': None, 'inputs': {}, 'cache_validator': {'fn': 'prefect.engine.cache_validators.never_use', 'kwargs': {}}, 'auto_generated': False, 'trigger': {'fn': 'prefect.triggers.all_successful', 'kwargs': {}}, 'skip_on_upstream_skip': True, 'slug': None, 'cache_for': None, 'retry_delay': None, 'outputs': 'typing.Any', 'type': 'rechunker.executors.prefect.SingleTaskWrapper', 'cache_key': None, 'name': 'SingleTaskWrapper', 'tags': [], '__version__': '0.14.19'}
Zanie
Zanie
Sean Harkins
05/28/2021, 7:51 PMZanie
Sean Harkins
05/28/2021, 8:06 PMscheduler
and the worker
containers.Sean Harkins
05/28/2021, 8:07 PMZanie
DaskExecutor
which is connecting to a dask scheduler on another "Dask" container which is also running the dask workers? Your tasks do not return large datastructures and are instead returning small amounts of data to the "Flow" container but the "Flow" container is running out of memory while the "Dask" container is doing fine?Zanie
S3Result
is the default result type when your flow has S3
storage.Zanie
return
in your tasks?Sean Harkins
05/28/2021, 8:17 PMECSRun
. Which uses the DaskExecutor to submit jobs to …
2. The Dask Scheduler which runs on another container which distributes work to workers …
3. These workers are running on other containers which the Dask scheduler spawns via adaptive scaling.
I was seeing some memory issues on the Dask side but that was caused by having too small an initial worker pool but now that I have a minimum number of workers, things seem alright on the Dask side.
But memory usage steadily grows (very rapidly) on the ECSRun task container .Zanie
Zanie
Zanie
S3Result
type, they will be writing their return values to your S3 bucket. Could you take a look there and see if any of the results are of significant size? (I think None
results are actually not written to prevent making a bunch of empty files, just fyi)Sean Harkins
05/28/2021, 8:21 PMSean Harkins
05/28/2021, 8:23 PMmapped
call. https://prefect-community.slack.com/archives/CL09KU1K7/p1622223606139900?thread_ts=1622219111.124500&cid=CL09KU1K7Zanie
PREFECT__FLOWS__CHECKPOINTING=False
in the environment for your run but this should not have any effect on the flow container.Sean Harkins
05/28/2021, 8:27 PMZanie
None
?Zanie
Sean Harkins
05/28/2021, 8:32 PMSean Harkins
05/28/2021, 8:39 PMSean Harkins
05/28/2021, 8:40 PMZanie
None
- I'm a bit confused by the class-with-closure stuff that's going on there and I can't really dig into pangeo's implementation details. The flow container should just be holding on to the return values from tasks and the states of the tasks which should fit easily into 32GB of memory with what you've outlined. If you can isolate where the memory consumption is coming from I can dig in some more and help find a solution but I'm reaching the extent of what I can do from here.Zanie
Sean Harkins
05/28/2021, 10:04 PMZanie
Sean Harkins
05/28/2021, 10:26 PMZanie
Sean Harkins
05/28/2021, 10:26 PMZanie
Sean Harkins
05/28/2021, 10:27 PMZanie
Zanie
Sean Harkins
05/28/2021, 10:28 PMSean Harkins
05/29/2021, 7:35 PMtracemalloc
instrumentation around flow.run()
using LocalRun
with a DaskExecutor
pointing to my cluster Dask cluster running on Fargate. On the local container where flow.run()
is executing I am seeing a rapid and steady increase in memory utilization prior to the OOM crash.
INFO:root:/srv/conda/envs/notebook/lib/python3.8/site-packages/msgpack/__init__.py:0: size=66.9 MiB, count=6, average=11.2 MiB
INFO:root:/srv/conda/envs/notebook/lib/python3.8/site-packages/msgpack/__init__.py:0: size=1156 MiB, count=120, average=9868 KiB
INFO:root:/srv/conda/envs/notebook/lib/python3.8/site-packages/msgpack/__init__.py:0: size=2408 MiB, count=251, average=9826 KiB
INFO:root:/srv/conda/envs/notebook/lib/python3.8/site-packages/msgpack/__init__.py:0: size=3603 MiB, count=377, average=9787 KiB
Sean Harkins
05/29/2021, 7:37 PMSean Harkins
05/30/2021, 5:28 PMINFO:root:27 memory blocks: 1047174.9 KiB
INFO:root: File "/srv/conda/envs/notebook/lib/python3.8/threading.py", line 890
INFO:root: self._bootstrap_inner()
INFO:root: File "/srv/conda/envs/notebook/lib/python3.8/threading.py", line 932
INFO:root: self.run()
INFO:root: File "/srv/conda/envs/notebook/lib/python3.8/threading.py", line 870
INFO:root: self._target(*self._args, **self._kwargs)
INFO:root: File "/srv/conda/envs/notebook/lib/python3.8/concurrent/futures/thread.py", line 80
INFO:root: work_item.run()
INFO:root: File "/srv/conda/envs/notebook/lib/python3.8/concurrent/futures/thread.py", line 57
INFO:root: result = self.fn(*self.args, **self.kwargs)
INFO:root: File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/utils.py", line 1468
INFO:root: return await loop.run_in_executor(_offload_executor, lambda: fn(*args, **kwargs))
INFO:root: File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/comm/utils.py", line 31
INFO:root: protocol.dumps(
INFO:root: File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/protocol/core.py", line 70
INFO:root: frames[0] = msgpack.dumps(msg, default=_encode_default, use_bin_type=True)
INFO:root: File "/srv/conda/envs/notebook/lib/python3.8/site-packages/msgpack/__init__.py", line 35
INFO:root: return Packer(**kwargs).pack(o)
Killed
Zanie
pip install prefect@git+<https://github.com/PrefectHQ/prefect@3eec97aa4182d93ac02f426abcd34b2780726d7e#egg=prefect>
then run your flow with python -m memory_profiler flow.py
Zanie
Sean Harkins
06/01/2021, 9:18 PMsubmit
method? I’m tracking how this may be related to an upstream issue on our task’s underlying class can be serialized but as Prefect does not actually construct a task graph I’d like to try and better understand how Tasks are sent to the dask scheduler. Can you point me to this code in the Prefect repo?Zanie
TaskRunner.run
is submitted for each task at https://github.com/PrefectHQ/prefect/blob/master/src/prefect/engine/flow_runner.py#L548-L549Sean Harkins
06/03/2021, 3:19 PM@profile
decorator results in ValueError: Tasks with variable positional arguments (*args) are not supported,
on registration. And when I extract the task’s body into another function and use the @profile
decorator there the logs just report ERROR: Could not find file flow.py
. What is the best approach for instrumenting tasks for memory profiling?Zanie
Sean Harkins
06/03/2021, 3:36 PMSean Harkins
06/03/2021, 3:41 PM@task
def source_url(day: str) -> str:
day = pd.Timestamp(day)
source_url_pattern = (
"<https://www.ncei.noaa.gov/data/>"
"sea-surface-temperature-optimum-interpolation/v2.1/access/avhrr/"
"{day:%Y%m}/oisst-avhrr-v02r01.{day:%Y%m%d}.nc"
)
return source_url_pattern.format(day=day)
@task
def download(source_url, cache_location):
target_url = os.path.join(cache_location, str(hash(source_url)))
try:
fsspec.open(target_url).open()
return target_url
except FileNotFoundError:
pass
with fsspec.open(source_url, mode="rb") as source:
with fsspec.open(target_url, mode="wb") as target:
target.write(source.read())
return target_url
with Flow(
name,
storage=storage.S3(bucket="pangeo-forge-aws-bakery-flowstoragebucketpangeof-71w6gsnambj9"),
run_config=run_config,
executor=dask_executor,
) as flow:
days = Parameter(
"days",
default=pd.date_range("1981-09-01", "2021-01-05", freq="D").strftime("%Y-%m-%d").tolist(),
)
sources = source_url.map(days)
nc_sources = download.map(
sources,
cache_location=unmapped(f"<s3://pangeo-forge-aws-bakery-flowcachebucketpangeofor-196cpck7y0pbl/{name}/cache>"),
)
Sean Harkins
06/03/2021, 3:44 PMSean Harkins
06/03/2021, 3:45 PMlist
like this
def source_url(day: str) -> str:
day = pd.Timestamp(day)
source_url_pattern = (
"<https://www.ncei.noaa.gov/data/>"
"sea-surface-temperature-optimum-interpolation/v2.1/access/avhrr/"
"{day:%Y%m}/oisst-avhrr-v02r01.{day:%Y%m%d}.nc"
)
return source_url_pattern.format(day=day)
@task
def download(source_url, cache_location):
target_url = os.path.join(cache_location, str(hash(source_url)))
try:
fsspec.open(target_url).open()
return target_url
except FileNotFoundError:
pass
with fsspec.open(source_url, mode="rb") as source:
with fsspec.open(target_url, mode="wb") as target:
target.write(source.read())
return target_url
with Flow(
name,
storage=storage.S3(bucket="pangeo-forge-aws-bakery-flowstoragebucketpangeof-71w6gsnambj9"),
run_config=run_config,
executor=dask_executor,
) as flow:
days = Parameter(
"days",
default=pd.date_range("1981-09-01", "2021-01-05", freq="D").strftime("%Y-%m-%d").tolist(),
)
sources = list(map(source_url, pd.date_range("1981-09-01", "2021-01-05", freq="D").strftime("%Y-%m-%d").to_list()))
nc_sources = download.map(
sources,
cache_location=unmapped(f"<s3://pangeo-forge-aws-bakery-flowcachebucketpangeofor-196cpck7y0pbl/{name}/cache>"),
)
Sean Harkins
06/03/2021, 3:47 PMsource_url
is a vanilla function rather than a task
I see full parallel worker utilization like I would have expected in the other exampleSean Harkins
06/03/2021, 3:47 PMZanie
Zanie
Zanie
Sean Harkins
06/03/2021, 5:26 PMtracemalloc
statements directly in my task to track things rather than using the memory_profiler
.Zanie
tracemalloc.start()
directly in your task?Sean Harkins
06/03/2021, 5:27 PMZanie
Zanie
Sean Harkins
06/03/2021, 6:56 PM