2) How to handle multithreading inside custom modu...
# ask-community
i
2) How to handle multithreading inside custom modules? Task runs from Prefect, but code defined in custom module. If there is a multithreading block like below, Prefect will not wait until all futures/threads are completed, but succeed task immediately and exit.
Copy code
# with concurrent.futures.thread.ThreadPoolExecutor(max_workers=10,
#                                                   thread_name_prefix="flow_") as executor:
#     futures = [executor.submit(self._run, df) for df in self.dfs]
#
#     for future in as_completed(futures):
#         try:
#             future.result()
#         except Exception as exc:
#             logger.exception(exc)
a
Normally with Prefect, you shouldn't have to use multithreading since when you use mapping, Prefect does that for you automatically
but if you want to use multithreading yourself you would need to make sure that Prefect context stays thread-safe - this is a bit tricky, there is some functionality built into Prefect to make that easier but this would involve some work on your end In contrast, if you leverage mapping and e.g.
LocalDaskExecutor
that uses multithreading under the hood by default, then Prefect will make sure that the context stays thread safe
btw thank you so much for posting separate questions in separate threads, you are a role model! 🙏
i
Yeap, for DaskCluster its clear. It controls multithreading itself and that all things work fine. I was asking in the context of our library has built in multithreading features that allows you to submit tasks from any command line, but when you take that library as is and run in Prefect - that won't work. So basically we need to wrap each library function into Prefect Task and then pass them through Prefect mapping feature
k
Coming from other thread, if you pass the
logger
as an input into your multiprocessing function, it will work, but really just use LocalDask and then you get observability into your tasks as well
a
Yeap, for DaskCluster its clear. It controls multithreading itself and that all things work fine.
Actually, using
LocalDaskExecutor
doesn't use Dask, it uses multithreading. Check out https://discourse.prefect.io/t/what-is-the-difference-between-a-daskexecutor-and-a-localdaskexecutor/374