Ivan Ksaver Šušnjara

    Ivan Ksaver Šušnjara

    1 year ago
    Hello. I'm having a problem with LocalDaskExecutor employing "processes". When calling flow.run() method, following error appears:
    RuntimeError: 
            An attempt has been made to start a new process before the
            current process has finished its bootstrapping phase.
    
            This probably means that you are not using fork to start your
            child processes and you have forgotten to use the proper idiom
            in the main module:
    
                if __name__ == '__main__':
                    freeze_support()
                    ...
    
            The "freeze_support()" line can be omitted if the program
            is not going to be frozen to produce an executable.
    What is more intriguing, some flows continue and complete nonetheless, while other require manual KeyboardInterrupt (ctrl+c), and then they start executing their respctive tasks. Dask provides solution for the issue (https://github.com/dask/distributed/issues/2520), however I don't see a workaround for this in Prefect. Problem does not appear when "threads" are employed instead of "processes" with LocalDaskExecutor's scheduler argument. I'm using Prefect version 0.14.16 with Python 3.8.7. Has anyone faced the same issue? Thanks in advance!
    Kevin Kho

    Kevin Kho

    1 year ago
    Hi @Ivan Ksaver Šušnjara, I have seen this issue recently. Do you have tasks with retries?
    Ivan Ksaver Šušnjara

    Ivan Ksaver Šušnjara

    1 year ago
    no
    Kevin Kho

    Kevin Kho

    1 year ago
    I have seen this worked around by doing something similar where attach the
    flow.executor = LocalDaskExecutor()
    under the
    if __name____ == __main___:_
    block
    Ivan Ksaver Šušnjara

    Ivan Ksaver Šušnjara

    1 year ago
    will give it a try, thanks. I will let you know of the results
    if __name__ == "__main__"
    block does not help here unfortunately
    Kevin Kho

    Kevin Kho

    1 year ago
    Did you try moving the whole Flow definition to
    if __name__ == "__main__"
    ?
    Ivan Ksaver Šušnjara

    Ivan Ksaver Šušnjara

    1 year ago
    in that case it returns
    NameError: name 'flow' is not defined
    . I'm using context manager (with statement in python) to define flow, and trying to run it from the same script, calling flow.run() method
    Kevin Kho

    Kevin Kho

    1 year ago
    from prefect import Flow, task
    import prefect
    import logging
    from prefect.executors import LocalDaskExecutor
    
    @task
    def mytask():
        logger = prefect.context.get('logger')
        <http://logger.info|logger.info>('hello')
    
    
    if __name__ == "__main__":
        with Flow('test') as flow:
            mytask()
        flow.executor = LocalDaskExecutor()
        flow.run()
    Like this?
    Actually this should be enough:
    if __name__ == "__main__":
        flow.run()
    Ivan Ksaver Šušnjara

    Ivan Ksaver Šušnjara

    1 year ago
    yeah, logic is as in your example. I'll give it a try, thanks
    works, but loses parallelism this way, i.e. works like plain LocalExecutor. Main reason for using LocalDaskExecutor was to parallelize flows
    Michael Hadorn

    Michael Hadorn

    1 year ago
    @Kevin Kho @Ivan Ksaver Šušnjara How do you solve this? Same problem here. Anyhow I guess, sadly it's not by design, to let a flow generate inside of function/classes. Had some troubles there also.