p

    Payam Vaezi

    9 months ago
    For some reason I’m getting large different memory usage when running prefect local dask with mapped task using prefect core vs when running the same workflow in prefect server. For my workflow I’m getting about max
    3GiB
    memory usage, while running in cloud with prefect server I’m getting above
    16GiB
    memory usage where job gets killed as a result of that. Any idea what may have caused this discrepancy in memory usage?
    Kevin Kho

    Kevin Kho

    9 months ago
    Are you still using LocalDaskExecutor with prefect server? Or are you using DaskExecutor?
    p

    Payam Vaezi

    9 months ago
    I’m still using local dask executor with prefect server. The workflow is exactly the same, tried with
    32GiB
    of memory limit and ran out of memory again!
    Kevin Kho

    Kevin Kho

    9 months ago
    We have known memory issues for DaskExecutor but not LocalDaskExecutor.
    flow.run()
    also uses the executor to run the Flow so this is surprising. Could you show me how you define your executor?
    p

    Payam Vaezi

    9 months ago
    import prefect
    
    prefect_executor_kwargs={"scheduler": "processes", "num_workers": 8, "threads_per_worker": 1}
    flow = prefect.Flow(name="test", executor=prefect.executors.LocalDaskExecutor(**prefect_executor_kwargs))
    Also, curios, what is the known issue with Dask executor? Cause we are thinking about scaling horizontally after we get succesful runs with local dask in cloud.
    Each process at worst case scenario in our case must process 780MiB each, and we are processing 100 mapped tasks in our example. So in my mind it should not require more that 780MiB * 8 memory.
    Kevin Kho

    Kevin Kho

    9 months ago
    It makes copies of a lot of things. I hope to get a fix out this week: https://github.com/PrefectHQ/prefect/pull/5004
    p

    Payam Vaezi

    9 months ago
    Thanks for sharing this, we can use local dask till then. Any idea why memory footprint is different in prefect core than server?
    Kevin Kho

    Kevin Kho

    9 months ago
    This looks right, I think this memory is coming from returning of large objects then like DataFrames that are checkpointed. You may lower your memory footprint by doing something like this:
    @task()
    def something(location):
        df = pd.read_csv(location)
        df['new_col'] = ...
        df.to_csv(location)
        return location
    or you can also try turning checkpointing off so the object doesn’t get persisted
    @task(checkpoint=False)
    def something(location):
        df = pd.read_csv(location)
        return df
    The
    Result
    interface gives an easy way to save this
    @task()
    def something(location):
        s3_result = S3Result(..., location=...)
        df = s3_result.read(location)
        df['new_col'] = ...
        s3_result.write(..., location=...)
        return location
    p

    Payam Vaezi

    9 months ago
    Checkpointing is off for all tasks already in this format:
    class PrefectTransformer(Task):
        def __init__(self, **kwargs):
            super().__init__(name="test", checkpoint=False)
        ...
    
        def run(self, **kwargs):
        ...
    And data structure being passed is dictionary (instead of pandas dataframe)
    Kevin Kho

    Kevin Kho

    9 months ago
    Roughly how big is that dict?
    p

    Payam Vaezi

    9 months ago
    780MiB in memory for 1 task when I profiled it.
    This is from local prefect core run on my machine, it shows 8 processes properly go after tasks, but in prefect server the memory is always increasing until it blows the container memory limit.
    Kevin Kho

    Kevin Kho

    9 months ago
    What RunConfig are you using? I vaguely recall something with ECS, but not super sure. Can dig if you are on ECS
    Ah wait sorry, I think this expected now because results are held in memory over a mapped operation when using a run with Server or Cloud
    Read this
    p

    Payam Vaezi

    9 months ago
    We are using Kubernetes environment.
    Kevin Kho

    Kevin Kho

    9 months ago
    Meaning your Flow will try to hold 78GB. It would be better if you persisted it for retrieval with downstream tasks
    p

    Payam Vaezi

    9 months ago
    This is unfortunate, cause we need to process a large chunk of data, and we want chunks at a time that is out of memory, any way we can replicate what we are seeing in prefect core behavior to happen on the server?
    Kevin Kho

    Kevin Kho

    9 months ago
    I’ll ask the team about that
    p

    Payam Vaezi

    9 months ago
    The downstream of the task causing problem is actually sinking the file to a storage for us. We don’t want to write all the logic in the same task to create modularity in the graph.
    @David Harrington FYI
    Anna Geller

    Anna Geller

    9 months ago
    @Payam Vaezi A couple more question to try to gather more information to understand the difference between the behavior on Core vs Server:1. How is your Server deployed - do you run it also on Kubernetes or on a VM with docker-compose? 2. What agent type do you use - does this agent run in the same environment as Server? 3. How do you define your run_config? Can you share its exact definition? 4. Are your tasks CPU-bound or IO-bound? 5. I understood the LocalDaskExecutor is defined the same way in the Core and Server test, correct? You mentioned locally you use local process while on Server everything runs on containers? Perhaps there are some resource limitations set on the containers and cause those discrepancies?
    p

    Payam Vaezi

    9 months ago
    Hey Anna, thanks for following up:1. It’s deployed to the same k8s cluster via helm and argo. Using open sourced helm charts of prefect server. 2. Using k8s agent. Yes, it is running in the same namespace as server and jobs. 3. We have a template and override it at flow run time. Sharing it in the next message separately. 4. For this current flow, it is IO bound and memory bound. 5. It is the same executor for running prefect core in my local machine, as well as same executor specs for running in prefect server. In run config as you see we need to specify some container resource limits (cause we are sharing cluster node with other people). That container resource limit is causing memory killed jobs.
    Here is the run config, I omitted image tag, some env vars and labels of deployment, since some of them are our enterprise specific values and we can’t share those:
    {'env': None, 'type': 'KubernetesRun', 'image': '***', 'labels': ['default'], 'cpu_limit': '8000m', '__version__': '0.15.7', 'cpu_request': '8000m', 'job_template': {'kind': 'Job', 'spec': {'template': {'spec': {'containers': [{'env': [{'name': 'PREFECT__LOGGING__EXTRA_LOGGERS', 'value': "['mlaasflow']"}, {'name': 'PREFECT__CLOUD__HEARTBEAT_MODE', 'value': 'thread'}], 'name': 'flow', 'image': '***', 'resources': {'limits': {'cpu': '200m'}, 'requests': {'cpu': '200m'}}, 'imagePullPolicy': 'IfNotPresent', 'securityContext': {'readOnlyRootFilesystem': False}}], 'restartPolicy': 'Never', 'securityContext': {'runAsUser': 1000, 'capabilities': {'drop': ['ALL']}, 'runAsNonRoot': True, 'readOnlyRootFilesystem': False}, 'imagePullSecrets': [{'name': ''}]}}}, 'metadata': {'name': 'prefect-job', 'labels': {}}, 'apiVersion': 'batch/v1'}, 'memory_limit': '16384Mi', 'memory_request': '16384Mi', 'image_pull_policy': None, 'job_template_path': None, 'image_pull_secrets': None, 'service_account_name': None}
    Anna Geller

    Anna Geller

    9 months ago
    @Payam Vaezi Thanks for sharing more info! If your flow is IO-bound then threads would be better than processes. Here are some things you can try: #1 Use a default configuration of LocalDaskExecutor():
    flow.executor = LocalDaskExecutor()
    #2 Try providing a custom job template to your KubernetesRun - I know you assigned 8000m as CPU request for your flow’s Kubernetes job, but this still looks suspicious:
    "resources": {
    "limits": {"cpu": "200m"},
    "requests": {"cpu": "200m"},
    },
    p

    Payam Vaezi

    9 months ago
    Removed those resources from config containers, and using threads, still seeing the same memory growing outbound issue.
    I’m trying to add memory profiler and running prefect core in the cloud, will update on what I find there.
    In our tests, we pin pointed most of memory discrepancy coming from reading csv in pandas, and pandas objects in memory lingers, seems like not related to prefect, since when I switched to reading json into dictionary the overhead of memory got resolved for most part.
    Anna Geller

    Anna Geller

    9 months ago
    nice work!