Hello. I'm having a problem with LocalDaskExecutor...
# prefect-server
i
Hello. I'm having a problem with LocalDaskExecutor employing "processes". When calling flow.run() method, following error appears:
Copy code
RuntimeError: 
        An attempt has been made to start a new process before the
        current process has finished its bootstrapping phase.

        This probably means that you are not using fork to start your
        child processes and you have forgotten to use the proper idiom
        in the main module:

            if __name__ == '__main__':
                freeze_support()
                ...

        The "freeze_support()" line can be omitted if the program
        is not going to be frozen to produce an executable.
What is more intriguing, some flows continue and complete nonetheless, while other require manual KeyboardInterrupt (ctrl+c), and then they start executing their respctive tasks. Dask provides solution for the issue (https://github.com/dask/distributed/issues/2520), however I don't see a workaround for this in Prefect. Problem does not appear when "threads" are employed instead of "processes" with LocalDaskExecutor's scheduler argument. I'm using Prefect version 0.14.16 with Python 3.8.7. Has anyone faced the same issue? Thanks in advance!
k
Hi @Ivan Ksaver Šušnjara, I have seen this issue recently. Do you have tasks with retries?
i
no
k
I have seen this worked around by doing something similar where attach the
flow.executor = LocalDaskExecutor()
under the
if __name____ == __main___:_
block
i
will give it a try, thanks. I will let you know of the results
if __name__ == "__main__"
block does not help here unfortunately
k
Did you try moving the whole Flow definition to
if __name__ == "__main__"
?
i
in that case it returns
NameError: name 'flow' is not defined
. I'm using context manager (with statement in python) to define flow, and trying to run it from the same script, calling flow.run() method
k
Copy code
from prefect import Flow, task
import prefect
import logging
from prefect.executors import LocalDaskExecutor

@task
def mytask():
    logger = prefect.context.get('logger')
    <http://logger.info|logger.info>('hello')


if __name__ == "__main__":
    with Flow('test') as flow:
        mytask()
    flow.executor = LocalDaskExecutor()
    flow.run()
Like this?
Actually this should be enough:
Copy code
if __name__ == "__main__":
    flow.run()
i
yeah, logic is as in your example. I'll give it a try, thanks
works, but loses parallelism this way, i.e. works like plain LocalExecutor. Main reason for using LocalDaskExecutor was to parallelize flows
m
@Kevin Kho @Ivan Ksaver Šušnjara How do you solve this? Same problem here. Anyhow I guess, sadly it's not by design, to let a flow generate inside of function/classes. Had some troubles there also.