For some reason I’m getting large different memory...
# prefect-server
p
For some reason I’m getting large different memory usage when running prefect local dask with mapped task using prefect core vs when running the same workflow in prefect server. For my workflow I’m getting about max
3GiB
memory usage, while running in cloud with prefect server I’m getting above
16GiB
memory usage where job gets killed as a result of that. Any idea what may have caused this discrepancy in memory usage?
k
Are you still using LocalDaskExecutor with prefect server? Or are you using DaskExecutor?
p
I’m still using local dask executor with prefect server. The workflow is exactly the same, tried with
32GiB
of memory limit and ran out of memory again!
k
We have known memory issues for DaskExecutor but not LocalDaskExecutor.
flow.run()
also uses the executor to run the Flow so this is surprising. Could you show me how you define your executor?
p
Copy code
import prefect

prefect_executor_kwargs={"scheduler": "processes", "num_workers": 8, "threads_per_worker": 1}
flow = prefect.Flow(name="test", executor=prefect.executors.LocalDaskExecutor(**prefect_executor_kwargs))
Also, curios, what is the known issue with Dask executor? Cause we are thinking about scaling horizontally after we get succesful runs with local dask in cloud.
Each process at worst case scenario in our case must process 780MiB each, and we are processing 100 mapped tasks in our example. So in my mind it should not require more that 780MiB * 8 memory.
k
It makes copies of a lot of things. I hope to get a fix out this week: https://github.com/PrefectHQ/prefect/pull/5004
p
Thanks for sharing this, we can use local dask till then. Any idea why memory footprint is different in prefect core than server?
k
This looks right, I think this memory is coming from returning of large objects then like DataFrames that are checkpointed. You may lower your memory footprint by doing something like this:
Copy code
@task()
def something(location):
    df = pd.read_csv(location)
    df['new_col'] = ...
    df.to_csv(location)
    return location
or you can also try turning checkpointing off so the object doesn’t get persisted
Copy code
@task(checkpoint=False)
def something(location):
    df = pd.read_csv(location)
    return df
The
Result
interface gives an easy way to save this
Copy code
@task()
def something(location):
    s3_result = S3Result(..., location=...)
    df = s3_result.read(location)
    df['new_col'] = ...
    s3_result.write(..., location=...)
    return location
p
Checkpointing is off for all tasks already in this format:
Copy code
class PrefectTransformer(Task):
    def __init__(self, **kwargs):
        super().__init__(name="test", checkpoint=False)
    ...

    def run(self, **kwargs):
    ...
And data structure being passed is dictionary (instead of pandas dataframe)
k
Roughly how big is that dict?
p
780MiB in memory for 1 task when I profiled it.
This is from local prefect core run on my machine, it shows 8 processes properly go after tasks, but in prefect server the memory is always increasing until it blows the container memory limit.
k
What RunConfig are you using? I vaguely recall something with ECS, but not super sure. Can dig if you are on ECS
Ah wait sorry, I think this expected now because results are held in memory over a mapped operation when using a run with Server or Cloud
Read this
p
We are using Kubernetes environment.
k
Meaning your Flow will try to hold 78GB. It would be better if you persisted it for retrieval with downstream tasks
p
This is unfortunate, cause we need to process a large chunk of data, and we want chunks at a time that is out of memory, any way we can replicate what we are seeing in prefect core behavior to happen on the server?
k
I’ll ask the team about that
p
The downstream of the task causing problem is actually sinking the file to a storage for us. We don’t want to write all the logic in the same task to create modularity in the graph.
@David Harrington FYI
a
@Payam Vaezi A couple more question to try to gather more information to understand the difference between the behavior on Core vs Server: 1. How is your Server deployed - do you run it also on Kubernetes or on a VM with docker-compose? 2. What agent type do you use - does this agent run in the same environment as Server? 3. How do you define your run_config? Can you share its exact definition? 4. Are your tasks CPU-bound or IO-bound? 5. I understood the LocalDaskExecutor is defined the same way in the Core and Server test, correct? You mentioned locally you use local process while on Server everything runs on containers? Perhaps there are some resource limitations set on the containers and cause those discrepancies?
p
Hey Anna, thanks for following up: 1. It’s deployed to the same k8s cluster via helm and argo. Using open sourced helm charts of prefect server. 2. Using k8s agent. Yes, it is running in the same namespace as server and jobs. 3. We have a template and override it at flow run time. Sharing it in the next message separately. 4. For this current flow, it is IO bound and memory bound. 5. It is the same executor for running prefect core in my local machine, as well as same executor specs for running in prefect server. In run config as you see we need to specify some container resource limits (cause we are sharing cluster node with other people). That container resource limit is causing memory killed jobs.
Here is the run config, I omitted image tag, some env vars and labels of deployment, since some of them are our enterprise specific values and we can’t share those:
Copy code
{'env': None, 'type': 'KubernetesRun', 'image': '***', 'labels': ['default'], 'cpu_limit': '8000m', '__version__': '0.15.7', 'cpu_request': '8000m', 'job_template': {'kind': 'Job', 'spec': {'template': {'spec': {'containers': [{'env': [{'name': 'PREFECT__LOGGING__EXTRA_LOGGERS', 'value': "['mlaasflow']"}, {'name': 'PREFECT__CLOUD__HEARTBEAT_MODE', 'value': 'thread'}], 'name': 'flow', 'image': '***', 'resources': {'limits': {'cpu': '200m'}, 'requests': {'cpu': '200m'}}, 'imagePullPolicy': 'IfNotPresent', 'securityContext': {'readOnlyRootFilesystem': False}}], 'restartPolicy': 'Never', 'securityContext': {'runAsUser': 1000, 'capabilities': {'drop': ['ALL']}, 'runAsNonRoot': True, 'readOnlyRootFilesystem': False}, 'imagePullSecrets': [{'name': ''}]}}}, 'metadata': {'name': 'prefect-job', 'labels': {}}, 'apiVersion': 'batch/v1'}, 'memory_limit': '16384Mi', 'memory_request': '16384Mi', 'image_pull_policy': None, 'job_template_path': None, 'image_pull_secrets': None, 'service_account_name': None}
a
@Payam Vaezi Thanks for sharing more info! If your flow is IO-bound then threads would be better than processes. Here are some things you can try: #1 Use a default configuration of LocalDaskExecutor():
Copy code
flow.executor = LocalDaskExecutor()
#2 Try providing a custom job template to your KubernetesRun - I know you assigned 8000m as CPU request for your flow’s Kubernetes job, but this still looks suspicious:
Copy code
"resources": {
"limits": {"cpu": "200m"},
"requests": {"cpu": "200m"},
},
p
Removed those resources from config containers, and using threads, still seeing the same memory growing outbound issue.
I’m trying to add memory profiler and running prefect core in the cloud, will update on what I find there.
In our tests, we pin pointed most of memory discrepancy coming from reading csv in pandas, and pandas objects in memory lingers, seems like not related to prefect, since when I switched to reading json into dictionary the overhead of memory got resolved for most part.
a
nice work!