Pedro Machado

    Pedro Machado

    6 months ago
    Hi everyone. I'd like to understand how memory is managed in a flow. I have a long-running flow that calls an API to get data. The flow works roughly like this: • get list of URLs to retrieve (about 180k URLs) • break the URLs in groups of 150 (list of lists) • a mapped task receives a list of 150 URLs and calls the API • another mapped task receives the API output for 150 URLs and saves the output to s3 I am using S3 results caching for the data intensive tasks (tasks 2 and 3 above) and Prefect Results for the rest of the tasks. I am seeing that the memory utilization keeps increasing until the container runs out of RAM (this is running on ECS fargate). It seems to be keeping the data retrieved from the API in memory, even after it's saved to s3. I can increase the container RAM but am trying to understand how I could write the flow so that it does not run out of RAM. This is what the Memory Utilization chart looks like. Eventually, the container dies and Prefect Cloud restarts it. Any suggestions?
    Kevin Kho

    Kevin Kho

    6 months ago
    Prefect does not manage memory by default. When you do mapping, all the results are held in memory so what you can do is persist the API output yourself and then
    return
    the location. The downstream tasks and then hold the location. This will minimize the footprint. You can also try using
    del
    and
    gc.collect()
    inside tasks
    Pedro Machado

    Pedro Machado

    6 months ago
    Thanks, Kevin. Question on your
    del
    and
    gc.collect()
    suggestions. This is what the save to s3 task looks like:
    @task(trigger=any_successful)
    def save_to_s3(profiles: Sequence[Dict[str, Any]], dry_run: bool = False) -> str:
    
        # save outside of the "ingestion" folder if it's a test run
        base_prefix = "dryrun" if dry_run else "ingestion"
    
        key = (
            f"{base_prefix}/employee_profiles/"
            "refresh_mode={parameters[refresh-mode]}/date={today}/{flow_run_id}_{map_index}.json".format(
                **prefect.context
            )
        )
        data = "\n".join([json.dumps(profile) for profile in profiles])
        <http://prefect.context.logger.info|prefect.context.logger.info>(
            f"Writing {len(profiles)} profiles to s3://{S3_BUCKET}/{key}"
        )
    
        return s3_upload_task.run(data=data, key=key)
    would I delete the
    data
    and
    profile
    variables after saving to s3? I'll look into the other suggestion as well.
    Kevin Kho

    Kevin Kho

    6 months ago
    yeah you can do something like:
    @task
    def ...
        ...
        loc = s3_upload_task.run(data=data, key=key)
        del data, key
        gc.collect()
        return loc
    and see if it helps?
    Pedro Machado

    Pedro Machado

    6 months ago
    Ok. I'll try that. For your first suggestion, it seems that I'd have to combine the task that pulls data with the one that saves it in a single task. Do you agree? Once it's persisted to s3 there is nothing else to do in this flow so there is no point in persisting the data twice.
    Kevin Kho

    Kevin Kho

    6 months ago
    Yeah I agree