DJ Erraballi

12/08/2020, 10:07 PM
Hi there, i am running into some memory utilization issues with some of my prefect flows, but am struggling to figure out why it seems memory isn’t being freed up or released between tasks in a specific flow: In the first graph here i have memory utilization for a specific flow (from cloudwatch metrics). As you can see memory steadily increases from 12:22 - 12:46 and then stays level until the flow completes. In the second attachment i have the flow and task durations during the flow. We can see that memory utilization steadily increases during the link patient events run, but after the task is completed the memory is never freed up. The flow in question is defined as such, so the link_patient_events tasks does not return anything other than a boolean: (Using prefect v0.11.5)
Copy code
with SRMFlow(
) as trigger_patient_loader_flow:
    client_abbrev: str = Parameter("client_abbrev", default="")
    event_source_id: int = Parameter("event_source_id", default=None)

    lock_acquired: bool = acquire_event_source_lock(client_abbrev, event_source_id)
    linked: bool = link_patient_events(client_abbrev, event_source_id, lock_acquired)
    aggregated_patients: List[PersonInputProperties] = aggregate_patients(client_abbrev, linked, event_source_id)
    bulk_activity_metadata: dict = get_bulk_activity_metadata(client_abbrev, aggregated_patients)
    loaded: bool = load_aggregated_patients(client_abbrev, aggregated_patients, bulk_activity_metadata)
    marked_complete: bool = mark_event_source_as_completed(client_abbrev, event_source_id, loaded, lock_acquired)
    lock_released: bool = release_event_source_lock(client_abbrev, event_source_id, marked_complete, lock_acquired)
So the main thing i am wondering is why memory is not being released after the task is completed (It is possible that the task itself has a memory leak but as far as I can tell we aren’t making any native function calls that would have that kind of property).

Chris White

12/09/2020, 1:44 AM
Hi DJ - assuming you are using default configuration for this flow, any data returned by the task is held in memory until the flow completes, and I assume that is what you’re seeing here (the gradual collection of task results in memory)

Joe Schmid

12/09/2020, 3:20 PM
@DJ Erraballi Some more context: Prefect allows you to inspect the results from all tasks after a Flow run completes, e.g.:
Copy code
>>> task_ref = flow.get_tasks()[0]
>>> state =
>>> state._result.value  # a Flow State's Result value is its Tasks' States
{<Task: add>: <Success: "Task run succeeded.">,
 <Task: add>: <Success: "Task run succeeded.">}
>>> state.result  # the public property aliases the same API as above
{<Task: add>: <Success: "Task run succeeded.">,
 <Task: add>: <Success: "Task run succeeded.">}
>>> state.result[task_ref]._result  # a Task State's Result contains the Task's return value
<Result: 1>
So the Flow runner process will (by default) keep all task results in memory. If you wanted to avoid saving task results in memory but still pass data (references) between tasks, you could use something like S3Result configured either at the Flow level (persist task output in S3 for all tasks in the Flow) or the Task level (persist just for this task). Much more here:
So if you made a one-line update to your Flow like:
Copy code
with SRMFlow(
        result=S3Result(bucket='your-bucket')  # <---- Added this line
) as trigger_patient_loader_flow:
You could leave the rest of the Flow unchanged and memory usage in the Flow runner process should drop (well, fluctuate as tasks load data from S3 into memory and then free memory when done) since the task results would just be references to files in S3.