Ievgenii Martynenko

    Ievgenii Martynenko

    6 months ago
    2) How to handle multithreading inside custom modules? Task runs from Prefect, but code defined in custom module. If there is a multithreading block like below, Prefect will not wait until all futures/threads are completed, but succeed task immediately and exit.
    # with concurrent.futures.thread.ThreadPoolExecutor(max_workers=10,
    #                                                   thread_name_prefix="flow_") as executor:
    #     futures = [executor.submit(self._run, df) for df in self.dfs]
    #
    #     for future in as_completed(futures):
    #         try:
    #             future.result()
    #         except Exception as exc:
    #             logger.exception(exc)
    Anna Geller

    Anna Geller

    6 months ago
    Normally with Prefect, you shouldn't have to use multithreading since when you use mapping, Prefect does that for you automatically
    but if you want to use multithreading yourself you would need to make sure that Prefect context stays thread-safe - this is a bit tricky, there is some functionality built into Prefect to make that easier but this would involve some work on your end In contrast, if you leverage mapping and e.g.
    LocalDaskExecutor
    that uses multithreading under the hood by default, then Prefect will make sure that the context stays thread safe
    btw thank you so much for posting separate questions in separate threads, you are a role model! 🙏
    Ievgenii Martynenko

    Ievgenii Martynenko

    6 months ago
    Yeap, for DaskCluster its clear. It controls multithreading itself and that all things work fine. I was asking in the context of our library has built in multithreading features that allows you to submit tasks from any command line, but when you take that library as is and run in Prefect - that won't work. So basically we need to wrap each library function into Prefect Task and then pass them through Prefect mapping feature
    Kevin Kho

    Kevin Kho

    6 months ago
    Coming from other thread, if you pass the
    logger
    as an input into your multiprocessing function, it will work, but really just use LocalDask and then you get observability into your tasks as well
    Anna Geller

    Anna Geller

    6 months ago
    Yeap, for DaskCluster its clear. It controls multithreading itself and that all things work fine.
    Actually, using
    LocalDaskExecutor
    doesn't use Dask, it uses multithreading. Check out https://discourse.prefect.io/t/what-is-the-difference-between-a-daskexecutor-and-a-localdaskexecutor/374