We need `ResourceManager` `map`ped to list of days. We have a task, archiving data for one day. The ...
j
We need
ResourceManager
`map`ped to list of days. We have a task, archiving data for one day. The task is using temporary dir (mapped to shared memory) and we use ResourceManager to cleanup the tmp dir when a day is processed (there is day subdir per processed day). Now we want to run the same for list of days and we struggle with mapping days to resource manager.
To me it seems like the Resource Manager cannot be simply mapped this way.
k
Hey @Jan Vlčinský, I think I understand what you are saying. Do you think it can be achieved if you did:
Copy code
@task
def abc(x):
    with TempDir() as tmpdir:
         # do stuff
    return
But yeah the map would create multiple instances.
j
@Kevin Kho what you propose, looks exactly as the solution we need. It is inverted to what is in doc (manager here is not managing the task from outside but serving inside). It shall definitely simplify our code.
k
Oh ok. Do you know how to do that? with the
___enter___
and
___exit___
methods?
j
Yes. It is ordinary context manager.
👍 1
k
I think the resource manager in the docs is more for temporary compute resources.
j
I sometime forget about standard tools we have at hand.
My temporary directory context manager:
Copy code
import shutil
from contextlib import contextmanager
from pathlib import Path

import prefect


@contextmanager
def managed_tmp_dir(relative_workdir):
    base_dir = prefect.context.base_dir.path
    base_path = Path(base_dir) / relative_workdir
    base_path.mkdir(exist_ok=True, parents=True)
    try:
        yield base_path
    finally:
        shutil.rmtree(base_path, ignore_errors=True)
Used in task as follows:
Copy code
class CreateDailyArchive(ShellTask):
    def run(self, f1_sample_bucket_key, day_str, max_versions):
        """Fetch XMLs for one day and create an archive.

        It attemtps to clean up all used resources as soon as possible, but it is advisable
        to have follow up task wchich unconditionally cleans up what could be left.
        """
        logger = prefect.context.get("logger")

        day = datetime.strptime(day_str, "%Y-%m-%d")

        key_namer = KeyNamer(prefect.context, f1_sample_bucket_key)
        source_bucket_key = key_namer.f1_sample()
        target_bucket_key = key_namer.daily_archive(day)

        relative_workdir = key_namer.f2_day_workdir(day)

        with managed_tmp_dir(relative_workdir) as workdir:

            <http://logger.info|logger.info>("Getting f1_dvi.")
            f1_dvi = self.get_f1_dvi(f1_sample_bucket_key, day)

            <http://logger.info|logger.info>("Fetching versions.")
            xmlgz_dir = self.fetch_files(
                source_bucket_key, f1_dvi, workdir, max_versions
            )
k
That’s nice man 👍