https://prefect.io logo
k

Kyle McChesney

05/26/2022, 4:31 PM
Is there any documentation around ResourceManagers and concurrency. Mostly just wondering about something like
Copy code
with Flow(
    'resource_manager',
    executor=LocalDaskExecutor(),
) as flow:
    with ResourceManager() as manager:
        something_res = something(manager)
        get_res = another_thing.map(something_res, unmapped(manager))
Mostly wondering If I have the resource manager return “itself” how many copies are made via map
k

Kevin Kho

05/26/2022, 4:33 PM
I believe 1 if unmapped but only 90% sure
k

Kyle McChesney

05/26/2022, 4:33 PM
For context, I am setting up a temporary directory in my manager, and returning itself from
setup
. The manager object has functions like
get_file
, etc to download a remote file to the temp dir. Its more or less a local file cache that will keep track of the disk space available, load/delete files as needed. I am wondering how this will work if I pass the manager to multiple tasks.
k

Kevin Kho

05/26/2022, 4:34 PM
I think the issue there is potentially hitting the same file with concurrency right?
k

Kyle McChesney

05/26/2022, 4:36 PM
I think that is one issue yeah, the other is needing some kind of transaction, otherwise 2 mapped tasks can ask for files simultaneously and then depending on the order of the “available space” check, they both will get their files downloaded and then use up all the disk space.
@Kevin Kho - following back up on this, I’ve confirmed that it seems to be the same instance. I am still trying to figure out what underlying concurrency primitives are being used here. At some point is each mapped task execution in its own thread or process? What mechanism is used to allow for these shared objects between thread/process. All of this is assuming
LocalDaskExecutor
k

Kevin Kho

05/26/2022, 10:27 PM
If you use LocalDaskExecutor threads, memory can be shared. If you use process, I believe Dask copies it. It might even fail when you use processes unless that unmapped thing can be serialized which connections normally can’t be
k

Kyle McChesney

05/27/2022, 4:58 PM
seems like using a
threading.Lock
directly worked just fine. Roughly:
Copy code
@resource_manager
from threading import Lock

class Manager:
    def do_thing(self, foo):
        with self._lock:
            return foo + 1

    def setup(self):
        self._lock = Lock()
        return self

    def cleanup(self, _):
        pass
Copy code
@task
def task(number, manager):
    return manager.do_thing(number)


with Flow('example'):
    numbers = Parameter('numbers')
    with Manager() as manager:
        task.map(numbers, unmapped(manager))
with self._lock
will wait until unlocked, and hold the lock for the duration
k

Kevin Kho

05/27/2022, 5:02 PM
Ah I see that’s nice
l

Luke Segars

07/22/2022, 11:33 PM
This just really helped me! Thank you!!
🎉 1
3 Views