Tihomir Dimov
04/16/2021, 8:11 PMflow.environment = LocalEnvironment(executor=LocalDaskExecutor(scheduler="threads", num_workers=num_workers))
to achieve the following execution example: get['string1'], map['string1'], save['string1'] -> get['string2'], map['string2'], save['string2'] -> get['string3']..., but we experience some issues with the LocalDaskExecutor, therefore we want to use DaskExecutor instead, but we struggle configuring it to achieve the same result. Currently we use flow.executor = DaskExecutor()
and the tasks run like this: get['string1'], get['string2'], get['string3'] -> map['string1'], map['string2'], map['string3'] -> save['string1']..., which is not recourse-effective. How can we configure the DaskExecutor to achieve the first execution example?Kevin Kho
Tihomir Dimov
04/16/2021, 8:19 PMTihomir Dimov
04/16/2021, 8:23 PMKevin Kho
import time
from prefect import task, Flow
from prefect.executors import LocalDaskExecutor
import prefect
@task
def times():
return [1, 5, 10]
@task
def sleep(x):
logger = prefect.context.get("logger")
<http://logger.info|logger.info>(f'sleeping for {x}')
time.sleep(x)
<http://logger.info|logger.info>(f'done sleeping for {x}')
return x
with Flow("mapping-test") as flow:
sleep.map(sleep.map(times))
flow.run(executor=LocalDaskExecutor())
Kevin Kho
Tihomir Dimov
04/16/2021, 8:58 PMKevin Kho
Kevin Kho
DaskExecutor
). I did indeed get depth first execution. I ran the 1s, 5s, and 10s tasks and you can see the wait times below.Kevin Kho
Tihomir Dimov
04/17/2021, 12:17 PMimport time
from datetime import timedelta
from prefect import task, Flow
from prefect.executors import LocalDaskExecutor
import prefect
@task(max_retries=3, retry_delay=timedelta(minutes=5), timeout=60)
def get(x):
logger = prefect.context.get("logger")
<http://logger.info|logger.info>(f'get for {x}')
time.sleep(x)
return x
@task
def transform(x):
logger = prefect.context.get("logger")
<http://logger.info|logger.info>(f'transform for {x}')
time.sleep(x)
return x
with Flow("mapping-test") as flow:
result = get.map([5, 10])
transform.map(result)
flow.executor = LocalDaskExecutor(num_workers=1)
flow.run()
If we remove the timeout=60
everything works just fine. How can we solve the error that occurs with the timeout=60
of the task?Kevin Kho
Kevin Kho
Tihomir Dimov
04/18/2021, 5:24 PMRuntimeError:
An attempt has been made to start a new process before the
current process has finished its bootstrapping phase.
This probably means that you are not using fork to start your
child processes and you have forgotten to use the proper idiom
in the main module:
if __name__ == '__main__':
freeze_support()
...
The "freeze_support()" line can be omitted if the program
is not going to be frozen to produce an executable.
File ".../python3.8/site-packages/prefect/engine/runner.py", line 48, in inner
new_state = method(self, state, *args, **kwargs)
File ".../python3.8/site-packages/prefect/engine/task_runner.py", line 865, in get_task_run_state
value = prefect.utilities.executors.run_task_with_timeout(
File ".../python3.8/site-packages/prefect/utilities/executors.py", line 348, in run_task_with_timeout
return run_with_multiprocess_timeout(
File ".../python3.8/site-packages/prefect/utilities/executors.py", line 266, in run_with_multiprocess_timeout
run_process.start()
File ".../python3.8/multiprocessing/process.py", line 121, in start
self._popen = self._Popen(self)
File ".../python3.8/multiprocessing/context.py", line 284, in _Popen
return Popen(process_obj)
File "/usr/lib/python3.8/multiprocessing/popen_spawn_posix.py", line 32, in __init__
super().__init__(process_obj)
File ".../python3.8/multiprocessing/popen_fork.py", line 19, in __init__
self._launch(process_obj)
File ".../python3.8/multiprocessing/popen_spawn_posix.py", line 42, in _launch
prep_data = spawn.get_preparation_data(process_obj._name)
File ".../python3.8/multiprocessing/spawn.py", line 154, in get_preparation_data
_check_not_importing_main()
File ".../python3.8/multiprocessing/spawn.py", line 134, in _check_not_importing_main
raise RuntimeError
Kevin Kho
Tihomir Dimov
04/18/2021, 5:47 PMKevin Kho
Kevin Kho
flow.executor
outside the with
statement and see if you get the same error? I suspect that is causing the issue.Tihomir Dimov
04/21/2021, 1:23 PMflow.executor
is outside of the with
the same error continues to occur. If we add the flow.executor
in if statement: if __name__ == '__main__':
everything seems to be working properly.Kevin Kho
Tihomir Dimov
04/21/2021, 2:10 PMKevin Kho
Kevin Kho
LocalDaskExecutor("processes")
?Tihomir Dimov
04/21/2021, 3:00 PM