Kyle McChesney
05/26/2022, 4:31 PMwith Flow(
'resource_manager',
executor=LocalDaskExecutor(),
) as flow:
with ResourceManager() as manager:
something_res = something(manager)
get_res = another_thing.map(something_res, unmapped(manager))
Mostly wondering If I have the resource manager return “itself” how many copies are made via mapKevin Kho
05/26/2022, 4:33 PMKyle McChesney
05/26/2022, 4:33 PMsetup
. The manager object has functions like get_file
, etc to download a remote file to the temp dir. Its more or less a local file cache that will keep track of the disk space available, load/delete files as needed. I am wondering how this will work if I pass the manager to multiple tasks.Kevin Kho
05/26/2022, 4:34 PMKyle McChesney
05/26/2022, 4:36 PMLocalDaskExecutor
Kevin Kho
05/26/2022, 10:27 PMKyle McChesney
05/27/2022, 4:58 PMthreading.Lock
directly worked just fine. Roughly:
@resource_manager
from threading import Lock
class Manager:
def do_thing(self, foo):
with self._lock:
return foo + 1
def setup(self):
self._lock = Lock()
return self
def cleanup(self, _):
pass
@task
def task(number, manager):
return manager.do_thing(number)
with Flow('example'):
numbers = Parameter('numbers')
with Manager() as manager:
task.map(numbers, unmapped(manager))
with self._lock
will wait until unlocked, and hold the lock for the durationKevin Kho
05/27/2022, 5:02 PMLuke Segars
07/22/2022, 11:33 PM