https://prefect.io logo
Title
i

Ivan Ksaver Šušnjara

04/22/2021, 1:37 PM
Hello. I'm having a problem with LocalDaskExecutor employing "processes". When calling flow.run() method, following error appears:
RuntimeError: 
        An attempt has been made to start a new process before the
        current process has finished its bootstrapping phase.

        This probably means that you are not using fork to start your
        child processes and you have forgotten to use the proper idiom
        in the main module:

            if __name__ == '__main__':
                freeze_support()
                ...

        The "freeze_support()" line can be omitted if the program
        is not going to be frozen to produce an executable.
What is more intriguing, some flows continue and complete nonetheless, while other require manual KeyboardInterrupt (ctrl+c), and then they start executing their respctive tasks. Dask provides solution for the issue (https://github.com/dask/distributed/issues/2520), however I don't see a workaround for this in Prefect. Problem does not appear when "threads" are employed instead of "processes" with LocalDaskExecutor's scheduler argument. I'm using Prefect version 0.14.16 with Python 3.8.7. Has anyone faced the same issue? Thanks in advance!
k

Kevin Kho

04/22/2021, 1:41 PM
Hi @Ivan Ksaver Šušnjara, I have seen this issue recently. Do you have tasks with retries?
i

Ivan Ksaver Šušnjara

04/22/2021, 1:41 PM
no
k

Kevin Kho

04/22/2021, 1:44 PM
I have seen this worked around by doing something similar where attach the
flow.executor = LocalDaskExecutor()
under the
if __name____ == __main___:_
block
i

Ivan Ksaver Šušnjara

04/22/2021, 1:45 PM
will give it a try, thanks. I will let you know of the results
if __name__ == "__main__"
block does not help here unfortunately
k

Kevin Kho

04/22/2021, 2:17 PM
Did you try moving the whole Flow definition to
if __name__ == "__main__"
?
i

Ivan Ksaver Šušnjara

04/22/2021, 2:20 PM
in that case it returns
NameError: name 'flow' is not defined
. I'm using context manager (with statement in python) to define flow, and trying to run it from the same script, calling flow.run() method
k

Kevin Kho

04/22/2021, 2:40 PM
from prefect import Flow, task
import prefect
import logging
from prefect.executors import LocalDaskExecutor

@task
def mytask():
    logger = prefect.context.get('logger')
    <http://logger.info|logger.info>('hello')


if __name__ == "__main__":
    with Flow('test') as flow:
        mytask()
    flow.executor = LocalDaskExecutor()
    flow.run()
Like this?
Actually this should be enough:
if __name__ == "__main__":
    flow.run()
i

Ivan Ksaver Šušnjara

04/23/2021, 6:28 AM
yeah, logic is as in your example. I'll give it a try, thanks
works, but loses parallelism this way, i.e. works like plain LocalExecutor. Main reason for using LocalDaskExecutor was to parallelize flows
m

Michael Hadorn

06/11/2021, 12:47 PM
@Kevin Kho @Ivan Ksaver Šušnjara How do you solve this? Same problem here. Anyhow I guess, sadly it's not by design, to let a flow generate inside of function/classes. Had some troubles there also.