Hi all, We have a flow which accepts as input stri...
# ask-community
t
Hi all, We have a flow which accepts as input string array and for each item 3 Tasks (get, map, save) are executed. Until now we are using
flow.environment = LocalEnvironment(executor=LocalDaskExecutor(scheduler="threads", num_workers=num_workers))
to achieve the following execution example: get['string1'], map['string1'], save['string1'] -> get['string2'], map['string2'], save['string2'] -> get['string3']..., but we experience some issues with the LocalDaskExecutor, therefore we want to use DaskExecutor instead, but we struggle configuring it to achieve the same result. Currently we use
flow.executor = DaskExecutor()
and the tasks run like this: get['string1'], get['string2'], get['string3'] -> map['string1'], map['string2'], map['string3'] -> save['string1']..., which is not recourse-effective. How can we configure the DaskExecutor to achieve the first execution example?
👍 1
k
Hi @Tihomir Dimov! What version of Prefect are you on?
t
Hi @Kevin Kho 0.14.16
The goal is to use DaskExecutor to achieve depth-first execution.
k
It should be depth first by default already. Can you try this script:
Copy code
import time
from prefect import task, Flow
from prefect.executors import LocalDaskExecutor
import prefect


@task
def times():
  return [1, 5, 10]


@task
def sleep(x):
  logger = prefect.context.get("logger")
  <http://logger.info|logger.info>(f'sleeping for {x}')
  time.sleep(x)
  <http://logger.info|logger.info>(f'done sleeping for {x}')
  return x


with Flow("mapping-test") as flow:
  sleep.map(sleep.map(times))


flow.run(executor=LocalDaskExecutor())
Could you replace LocalDaskExecutor with your DaskExecutor? I don’t think it’s related but are you aware Execution Environments is being deprecated?
t
Yes, we are trying to use DaskExecutor but we cannot configure it to work depth-first. In your example above you use LocalDaskExecutor.
k
I would need to spin up a cluster (which is fine) but that won’t be reproducible. Is it possible for you to test this snippet on a DaskExecutor connected to a cluster? I can try it too and report back to you in a bit.
I spun up a cluster on Coiled and tried this code (only difference passing my cluster to a
DaskExecutor
). I did indeed get depth first execution. I ran the 1s, 5s, and 10s tasks and you can see the wait times below.
Where does your Dask cluster live? I would try checking the resources.
t
Hi @Kevin Kho, It turned out that the problem was due to the timeout of the task. Here is a script which causes the issue:
Copy code
import time
from datetime import timedelta

from prefect import task, Flow
from prefect.executors import LocalDaskExecutor
import prefect


@task(max_retries=3, retry_delay=timedelta(minutes=5), timeout=60)
def get(x):
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>(f'get for {x}')
    time.sleep(x)
    return x


@task
def transform(x):
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>(f'transform for {x}')
    time.sleep(x)
    return x


with Flow("mapping-test") as flow:
    result = get.map([5, 10])
    transform.map(result)

    flow.executor = LocalDaskExecutor(num_workers=1)


flow.run()
If we remove the
timeout=60
everything works just fine. How can we solve the error that occurs with the
timeout=60
of the task?
k
Thanks for finding the cause. I will try to recreate this on my end sometime tom.
Ok I got your code and ran a couple of variations. I did not find timeout changes anything (except some logs don't seem to show). But I did find what you are asking now that the executor seems to pick up the tasks on the same breadth level rather than going on depth in some cases. I'll ask the team for more details and get back to you.
🙏 1
t
Here is the error I get, when there is a timeout:
Copy code
RuntimeError: 
        An attempt has been made to start a new process before the
        current process has finished its bootstrapping phase.

        This probably means that you are not using fork to start your
        child processes and you have forgotten to use the proper idiom
        in the main module:

            if __name__ == '__main__':
                freeze_support()
                ...

        The "freeze_support()" line can be omitted if the program
        is not going to be frozen to produce an executable.

File ".../python3.8/site-packages/prefect/engine/runner.py", line 48, in inner
    new_state = method(self, state, *args, **kwargs)
  File ".../python3.8/site-packages/prefect/engine/task_runner.py", line 865, in get_task_run_state
    value = prefect.utilities.executors.run_task_with_timeout(
  File ".../python3.8/site-packages/prefect/utilities/executors.py", line 348, in run_task_with_timeout
    return run_with_multiprocess_timeout(
  File ".../python3.8/site-packages/prefect/utilities/executors.py", line 266, in run_with_multiprocess_timeout
    run_process.start()
  File ".../python3.8/multiprocessing/process.py", line 121, in start
    self._popen = self._Popen(self)
  File ".../python3.8/multiprocessing/context.py", line 284, in _Popen
    return Popen(process_obj)
  File "/usr/lib/python3.8/multiprocessing/popen_spawn_posix.py", line 32, in __init__
    super().__init__(process_obj)
  File ".../python3.8/multiprocessing/popen_fork.py", line 19, in __init__
    self._launch(process_obj)
  File ".../python3.8/multiprocessing/popen_spawn_posix.py", line 42, in _launch
    prep_data = spawn.get_preparation_data(process_obj._name)
  File ".../python3.8/multiprocessing/spawn.py", line 154, in get_preparation_data
    _check_not_importing_main()
  File ".../python3.8/multiprocessing/spawn.py", line 134, in _check_not_importing_main
    raise RuntimeError
k
You only get this with timeout? That’s interesting. I encountered this before but was due to not having Dask installed right. Will explore this also.
t
Yes, if there is no timeout in the above example the flow works fine.
k
Hi @Tihomir Dimov, about depth first execution, Dask prefers it but it is not required and there is no way to control it. More often than not, tasks will be executed in a depth first manner, but it may also not.
On the second, can you try defining
flow.executor
outside the
with
statement and see if you get the same error? I suspect that is causing the issue.
t
Hi @Kevin Kho, thank you very much for your time and quick answers, I appreciate it. Do the Prefect team plan to introduce in some way a control for depth-first and breath-first? If the
flow.executor
is outside of the
with
the same error continues to occur. If we add the
flow.executor
in if statement:
if __name__ == '__main__':
everything seems to be working properly.
k
Hey Tihomir, the issue is that there is no way to force Dask to fully execute depth-first at the moment. Are you on Windows?
t
Hey Kevin, on Linux.
k
Ok I’ll look into that other error you’re seeing
What happens if you do
LocalDaskExecutor("processes")
?
You are right though, the timeout fails sometimes. It is also in the docs here. This is a known issue and I don’t think there is more we can do. I would maybe just continue using
if __name__ == '__main__':
if it worked.
t
Ok, thank you very much!