Hey everyone ! Hope you had a great weekend ! I'm ...
# prefect-server
s
Hey everyone ! Hope you had a great weekend ! I'm having troubles regarding flows memory footprint. I have a bunch of flows that look like this :
Copy code
ids = get_ids()
step_1 = do_step_1.map(ids)
step_2 = do_step_2.map(step_1)
step_3 = do_step_3.map(step_2)
The number of ids retrieved can vary by a few orders of magnitude and I cannot predict it. The issue I see is that while the flow runs, the memory footprint keeps increasing which sometimes results in an OOM kill of the pod running the flow. Is there any way to have the memory footprint be near constant with regard to the number of executed mapped tasks in the flow ? I understand that the initial mapping requires a bunch of memory and that there is no way around it. I am running on K8S, using a LocalDaskExecutor (threaded) and had hoped that depth first execution would mean there would be some amount of garbage collecting with fully executed branches. Would setting up a
Result
in the mapped tasks help in any way ? I tried manually releasing memory inside the tasks code (with
del
and
gc
mostly) but saw no improvement. Another solution I see would be to have steps 1-3 be executed in their own separate flow but that means we would spin up a bunch of ephemeral pods and lengthen the flow overall I suppose ? Thanks !
a
@Sylvain Hazard here are some ideas: #1 Try to increase the memory request for the flow run’s Kubernetes job:
Copy code
with Flow(
        FLOW_NAME,
        storage=STORAGE,
        run_config=KubernetesRun(
            labels=["xxx"],
            cpu_request=0.5,
            memory_request="8Gi", # as much as you can
        ),
) as flow:
#2 Experiment whether it makes sense to switch to a DaskExecutor with an ephemeral Dask cluster. I believe that Dask has a better mechanism to handle memory and could additionally provide you with Dask performance report. #3 Listing this as a last option to try: as you mentioned, separating a memory-intensive part to its own flow could make sense, if you could e.g. allocate more memory based on how large the list of IDs is. E.g.: memory_request=len(ids)*50 MB
k
DFE will not help you here because the task results are held in the task states until the entire flow completes. What would help is persisting the data I think:
Copy code
@task
def first_task():
    big_thing = ...
    location = save(big_thing)
    return location

@task
def second_task(location):
    big_thing = load(location)
    return something_small
and so on
s
That's a lot of good leads, thanks !