https://prefect.io logo
#prefect-community
Title
# prefect-community
i

Ievgenii Martynenko

03/24/2022, 9:44 AM
2) How to handle multithreading inside custom modules? Task runs from Prefect, but code defined in custom module. If there is a multithreading block like below, Prefect will not wait until all futures/threads are completed, but succeed task immediately and exit.
Copy code
# with concurrent.futures.thread.ThreadPoolExecutor(max_workers=10,
#                                                   thread_name_prefix="flow_") as executor:
#     futures = [executor.submit(self._run, df) for df in self.dfs]
#
#     for future in as_completed(futures):
#         try:
#             future.result()
#         except Exception as exc:
#             logger.exception(exc)
a

Anna Geller

03/24/2022, 1:34 PM
Normally with Prefect, you shouldn't have to use multithreading since when you use mapping, Prefect does that for you automatically
but if you want to use multithreading yourself you would need to make sure that Prefect context stays thread-safe - this is a bit tricky, there is some functionality built into Prefect to make that easier but this would involve some work on your end In contrast, if you leverage mapping and e.g.
LocalDaskExecutor
that uses multithreading under the hood by default, then Prefect will make sure that the context stays thread safe
btw thank you so much for posting separate questions in separate threads, you are a role model! 🙏
i

Ievgenii Martynenko

03/24/2022, 2:23 PM
Yeap, for DaskCluster its clear. It controls multithreading itself and that all things work fine. I was asking in the context of our library has built in multithreading features that allows you to submit tasks from any command line, but when you take that library as is and run in Prefect - that won't work. So basically we need to wrap each library function into Prefect Task and then pass them through Prefect mapping feature
k

Kevin Kho

03/24/2022, 2:46 PM
Coming from other thread, if you pass the
logger
as an input into your multiprocessing function, it will work, but really just use LocalDask and then you get observability into your tasks as well
a

Anna Geller

03/24/2022, 3:34 PM
Yeap, for DaskCluster its clear. It controls multithreading itself and that all things work fine.
Actually, using
LocalDaskExecutor
doesn't use Dask, it uses multithreading. Check out https://discourse.prefect.io/t/what-is-the-difference-between-a-daskexecutor-and-a-localdaskexecutor/374
6 Views