Pedro Machado
03/22/2022, 4:43 PMKevin Kho
03/22/2022, 5:25 PMreturn
the location. The downstream tasks and then hold the location. This will minimize the footprint. You can also try using del
and gc.collect()
inside tasksPedro Machado
03/22/2022, 5:32 PMdel
and gc.collect()
suggestions.
This is what the save to s3 task looks like:
@task(trigger=any_successful)
def save_to_s3(profiles: Sequence[Dict[str, Any]], dry_run: bool = False) -> str:
# save outside of the "ingestion" folder if it's a test run
base_prefix = "dryrun" if dry_run else "ingestion"
key = (
f"{base_prefix}/employee_profiles/"
"refresh_mode={parameters[refresh-mode]}/date={today}/{flow_run_id}_{map_index}.json".format(
**prefect.context
)
)
data = "\n".join([json.dumps(profile) for profile in profiles])
<http://prefect.context.logger.info|prefect.context.logger.info>(
f"Writing {len(profiles)} profiles to s3://{S3_BUCKET}/{key}"
)
return s3_upload_task.run(data=data, key=key)
would I delete the data
and profile
variables after saving to s3?
I'll look into the other suggestion as well.Kevin Kho
03/22/2022, 5:35 PM@task
def ...
...
loc = s3_upload_task.run(data=data, key=key)
del data, key
gc.collect()
return loc
and see if it helps?Pedro Machado
03/22/2022, 5:35 PMKevin Kho
03/22/2022, 5:35 PM