Jan Vlčinský
09/10/2021, 10:31 AMResourceManager
`map`ped to list of days.
We have a task, archiving data for one day. The task is using temporary dir (mapped to shared memory) and we use ResourceManager to cleanup the tmp dir when a day is processed (there is day subdir per processed day).
Now we want to run the same for list of days and we struggle with mapping days to resource manager.Jan Vlčinský
09/10/2021, 10:31 AMKevin Kho
@task
def abc(x):
with TempDir() as tmpdir:
# do stuff
return
Kevin Kho
Jan Vlčinský
09/10/2021, 2:52 PMKevin Kho
___enter___
and ___exit___
methods?Jan Vlčinský
09/10/2021, 2:54 PMKevin Kho
Jan Vlčinský
09/10/2021, 2:59 PMJan Vlčinský
09/10/2021, 4:01 PMimport shutil
from contextlib import contextmanager
from pathlib import Path
import prefect
@contextmanager
def managed_tmp_dir(relative_workdir):
base_dir = prefect.context.base_dir.path
base_path = Path(base_dir) / relative_workdir
base_path.mkdir(exist_ok=True, parents=True)
try:
yield base_path
finally:
shutil.rmtree(base_path, ignore_errors=True)
Jan Vlčinský
09/10/2021, 4:02 PMclass CreateDailyArchive(ShellTask):
def run(self, f1_sample_bucket_key, day_str, max_versions):
"""Fetch XMLs for one day and create an archive.
It attemtps to clean up all used resources as soon as possible, but it is advisable
to have follow up task wchich unconditionally cleans up what could be left.
"""
logger = prefect.context.get("logger")
day = datetime.strptime(day_str, "%Y-%m-%d")
key_namer = KeyNamer(prefect.context, f1_sample_bucket_key)
source_bucket_key = key_namer.f1_sample()
target_bucket_key = key_namer.daily_archive(day)
relative_workdir = key_namer.f2_day_workdir(day)
with managed_tmp_dir(relative_workdir) as workdir:
<http://logger.info|logger.info>("Getting f1_dvi.")
f1_dvi = self.get_f1_dvi(f1_sample_bucket_key, day)
<http://logger.info|logger.info>("Fetching versions.")
xmlgz_dir = self.fetch_files(
source_bucket_key, f1_dvi, workdir, max_versions
)
Kevin Kho