Hi everyone. I'd like to understand how memory is ...
# prefect-community
p
Hi everyone. I'd like to understand how memory is managed in a flow. I have a long-running flow that calls an API to get data. The flow works roughly like this: • get list of URLs to retrieve (about 180k URLs) • break the URLs in groups of 150 (list of lists) • a mapped task receives a list of 150 URLs and calls the API • another mapped task receives the API output for 150 URLs and saves the output to s3 I am using S3 results caching for the data intensive tasks (tasks 2 and 3 above) and Prefect Results for the rest of the tasks. I am seeing that the memory utilization keeps increasing until the container runs out of RAM (this is running on ECS fargate). It seems to be keeping the data retrieved from the API in memory, even after it's saved to s3. I can increase the container RAM but am trying to understand how I could write the flow so that it does not run out of RAM. This is what the Memory Utilization chart looks like. Eventually, the container dies and Prefect Cloud restarts it. Any suggestions?
k
Prefect does not manage memory by default. When you do mapping, all the results are held in memory so what you can do is persist the API output yourself and then
return
the location. The downstream tasks and then hold the location. This will minimize the footprint. You can also try using
del
and
gc.collect()
inside tasks
p
Thanks, Kevin. Question on your
del
and
gc.collect()
suggestions. This is what the save to s3 task looks like:
Copy code
@task(trigger=any_successful)
def save_to_s3(profiles: Sequence[Dict[str, Any]], dry_run: bool = False) -> str:

    # save outside of the "ingestion" folder if it's a test run
    base_prefix = "dryrun" if dry_run else "ingestion"

    key = (
        f"{base_prefix}/employee_profiles/"
        "refresh_mode={parameters[refresh-mode]}/date={today}/{flow_run_id}_{map_index}.json".format(
            **prefect.context
        )
    )
    data = "\n".join([json.dumps(profile) for profile in profiles])
    <http://prefect.context.logger.info|prefect.context.logger.info>(
        f"Writing {len(profiles)} profiles to s3://{S3_BUCKET}/{key}"
    )

    return s3_upload_task.run(data=data, key=key)
would I delete the
data
and
profile
variables after saving to s3? I'll look into the other suggestion as well.
k
yeah you can do something like:
Copy code
@task
def ...
    ...
    loc = s3_upload_task.run(data=data, key=key)
    del data, key
    gc.collect()
    return loc
and see if it helps?
p
Ok. I'll try that. For your first suggestion, it seems that I'd have to combine the task that pulls data with the one that saves it in a single task. Do you agree? Once it's persisted to s3 there is nothing else to do in this flow so there is no point in persisting the data twice.
k
Yeah I agree
👍 1