https://prefect.io logo
p

Payam Vaezi

12/13/2021, 3:09 PM
For some reason I’m getting large different memory usage when running prefect local dask with mapped task using prefect core vs when running the same workflow in prefect server. For my workflow I’m getting about max
3GiB
memory usage, while running in cloud with prefect server I’m getting above
16GiB
memory usage where job gets killed as a result of that. Any idea what may have caused this discrepancy in memory usage?
k

Kevin Kho

12/13/2021, 3:16 PM
Are you still using LocalDaskExecutor with prefect server? Or are you using DaskExecutor?
p

Payam Vaezi

12/13/2021, 3:32 PM
I’m still using local dask executor with prefect server. The workflow is exactly the same, tried with
32GiB
of memory limit and ran out of memory again!
k

Kevin Kho

12/13/2021, 3:47 PM
We have known memory issues for DaskExecutor but not LocalDaskExecutor.
flow.run()
also uses the executor to run the Flow so this is surprising. Could you show me how you define your executor?
p

Payam Vaezi

12/13/2021, 3:57 PM
Copy code
import prefect

prefect_executor_kwargs={"scheduler": "processes", "num_workers": 8, "threads_per_worker": 1}
flow = prefect.Flow(name="test", executor=prefect.executors.LocalDaskExecutor(**prefect_executor_kwargs))
Also, curios, what is the known issue with Dask executor? Cause we are thinking about scaling horizontally after we get succesful runs with local dask in cloud.
Each process at worst case scenario in our case must process 780MiB each, and we are processing 100 mapped tasks in our example. So in my mind it should not require more that 780MiB * 8 memory.
k

Kevin Kho

12/13/2021, 4:00 PM
It makes copies of a lot of things. I hope to get a fix out this week: https://github.com/PrefectHQ/prefect/pull/5004
p

Payam Vaezi

12/13/2021, 4:03 PM
Thanks for sharing this, we can use local dask till then. Any idea why memory footprint is different in prefect core than server?
k

Kevin Kho

12/13/2021, 4:05 PM
This looks right, I think this memory is coming from returning of large objects then like DataFrames that are checkpointed. You may lower your memory footprint by doing something like this:
Copy code
@task()
def something(location):
    df = pd.read_csv(location)
    df['new_col'] = ...
    df.to_csv(location)
    return location
or you can also try turning checkpointing off so the object doesn’t get persisted
Copy code
@task(checkpoint=False)
def something(location):
    df = pd.read_csv(location)
    return df
The
Result
interface gives an easy way to save this
Copy code
@task()
def something(location):
    s3_result = S3Result(..., location=...)
    df = s3_result.read(location)
    df['new_col'] = ...
    s3_result.write(..., location=...)
    return location
p

Payam Vaezi

12/13/2021, 4:08 PM
Checkpointing is off for all tasks already in this format:
Copy code
class PrefectTransformer(Task):
    def __init__(self, **kwargs):
        super().__init__(name="test", checkpoint=False)
    ...

    def run(self, **kwargs):
    ...
And data structure being passed is dictionary (instead of pandas dataframe)
k

Kevin Kho

12/13/2021, 4:10 PM
Roughly how big is that dict?
p

Payam Vaezi

12/13/2021, 4:10 PM
780MiB in memory for 1 task when I profiled it.
This is from local prefect core run on my machine, it shows 8 processes properly go after tasks, but in prefect server the memory is always increasing until it blows the container memory limit.
k

Kevin Kho

12/13/2021, 4:12 PM
What RunConfig are you using? I vaguely recall something with ECS, but not super sure. Can dig if you are on ECS
Ah wait sorry, I think this expected now because results are held in memory over a mapped operation when using a run with Server or Cloud
Read this
p

Payam Vaezi

12/13/2021, 4:17 PM
We are using Kubernetes environment.
k

Kevin Kho

12/13/2021, 4:17 PM
Meaning your Flow will try to hold 78GB. It would be better if you persisted it for retrieval with downstream tasks
p

Payam Vaezi

12/13/2021, 4:19 PM
This is unfortunate, cause we need to process a large chunk of data, and we want chunks at a time that is out of memory, any way we can replicate what we are seeing in prefect core behavior to happen on the server?
k

Kevin Kho

12/13/2021, 4:20 PM
I’ll ask the team about that
p

Payam Vaezi

12/13/2021, 4:20 PM
The downstream of the task causing problem is actually sinking the file to a storage for us. We don’t want to write all the logic in the same task to create modularity in the graph.
@David Harrington FYI
a

Anna Geller

12/13/2021, 10:33 PM
@Payam Vaezi A couple more question to try to gather more information to understand the difference between the behavior on Core vs Server: 1. How is your Server deployed - do you run it also on Kubernetes or on a VM with docker-compose? 2. What agent type do you use - does this agent run in the same environment as Server? 3. How do you define your run_config? Can you share its exact definition? 4. Are your tasks CPU-bound or IO-bound? 5. I understood the LocalDaskExecutor is defined the same way in the Core and Server test, correct? You mentioned locally you use local process while on Server everything runs on containers? Perhaps there are some resource limitations set on the containers and cause those discrepancies?
p

Payam Vaezi

12/14/2021, 4:07 PM
Hey Anna, thanks for following up: 1. It’s deployed to the same k8s cluster via helm and argo. Using open sourced helm charts of prefect server. 2. Using k8s agent. Yes, it is running in the same namespace as server and jobs. 3. We have a template and override it at flow run time. Sharing it in the next message separately. 4. For this current flow, it is IO bound and memory bound. 5. It is the same executor for running prefect core in my local machine, as well as same executor specs for running in prefect server. In run config as you see we need to specify some container resource limits (cause we are sharing cluster node with other people). That container resource limit is causing memory killed jobs.
Here is the run config, I omitted image tag, some env vars and labels of deployment, since some of them are our enterprise specific values and we can’t share those:
Copy code
{'env': None, 'type': 'KubernetesRun', 'image': '***', 'labels': ['default'], 'cpu_limit': '8000m', '__version__': '0.15.7', 'cpu_request': '8000m', 'job_template': {'kind': 'Job', 'spec': {'template': {'spec': {'containers': [{'env': [{'name': 'PREFECT__LOGGING__EXTRA_LOGGERS', 'value': "['mlaasflow']"}, {'name': 'PREFECT__CLOUD__HEARTBEAT_MODE', 'value': 'thread'}], 'name': 'flow', 'image': '***', 'resources': {'limits': {'cpu': '200m'}, 'requests': {'cpu': '200m'}}, 'imagePullPolicy': 'IfNotPresent', 'securityContext': {'readOnlyRootFilesystem': False}}], 'restartPolicy': 'Never', 'securityContext': {'runAsUser': 1000, 'capabilities': {'drop': ['ALL']}, 'runAsNonRoot': True, 'readOnlyRootFilesystem': False}, 'imagePullSecrets': [{'name': ''}]}}}, 'metadata': {'name': 'prefect-job', 'labels': {}}, 'apiVersion': 'batch/v1'}, 'memory_limit': '16384Mi', 'memory_request': '16384Mi', 'image_pull_policy': None, 'job_template_path': None, 'image_pull_secrets': None, 'service_account_name': None}
a

Anna Geller

12/14/2021, 4:43 PM
@Payam Vaezi Thanks for sharing more info! If your flow is IO-bound then threads would be better than processes. Here are some things you can try: #1 Use a default configuration of LocalDaskExecutor():
Copy code
flow.executor = LocalDaskExecutor()
#2 Try providing a custom job template to your KubernetesRun - I know you assigned 8000m as CPU request for your flow’s Kubernetes job, but this still looks suspicious:
Copy code
"resources": {
"limits": {"cpu": "200m"},
"requests": {"cpu": "200m"},
},
p

Payam Vaezi

12/14/2021, 5:23 PM
Removed those resources from config containers, and using threads, still seeing the same memory growing outbound issue.
I’m trying to add memory profiler and running prefect core in the cloud, will update on what I find there.
In our tests, we pin pointed most of memory discrepancy coming from reading csv in pandas, and pandas objects in memory lingers, seems like not related to prefect, since when I switched to reading json into dictionary the overhead of memory got resolved for most part.
a

Anna Geller

12/16/2021, 10:11 AM
nice work!