张强
10/10/2021, 1:21 AMmap
tasks are not parallel。
For the code, see reply.
times
and times2
are parallel, but then sleep
and sleep2
did not,Is there a limit to the implementation of map
?
There are application scenarios, such as a task operating different types of devices, each type of device has two serial tasks, but the number of devices is determined by parameters, such as (Of course, this is pure python serial.):
ret = []
for device in params.get('device_type1_list'):
result = operate_device_type1_step2(operate_device_type1_step1(device))
ret.append(result)
for device in params.get('device_type2_list'):
result = operate_device_type2_step2(operate_device_type2_step1(device))
ret.append(result)
# do something for ret
handle(ret)
Here, each device can be parallel, but the steps
are serial.
So is there a solution to this situation?Kevin Kho
Kevin Kho
with Flow...
a = A.map([1,2,3])
b = B.map(a)
c = C.map(b)
This will run in a depth first execution manner for the DaskExecutor
. For LocalDaskExecutor
, you can sometimes see breadth first execution, but this is something you can’t enforce in Dask as Dask is the one making decisions which tasks to run.
So for the second example you describe, I think you can combine device_type1_list
and device_type2_list
and then map over them like above?Kevin Kho
张强
10/10/2021, 8:37 AMLocalDaskExecutor
. When I switch to DaskExecutor
, it will be fine.张强
10/10/2021, 8:40 AMimport os
import time
import prefect
from prefect import task, Flow, Parameter
from prefect.executors import LocalDaskExecutor
@task
def times():
return [1, 5, 10]
@task
def times2():
return [2, 6, 12]
@task(log_stdout=True)
def sleep(x):
time.sleep(x)
return x
@task
def sleep2(x):
time.sleep(x)
return x
with Flow("dfe2") as flow:
sleep.map(sleep.map(times))
sleep2.map(sleep2.map(times2))
flow.executor = LocalDaskExecutor()
flow.register(project_name="example")
Kevin Kho
张强
10/11/2021, 2:47 AMDaskExecutor
was carried out as expected, there are still problems with LocalDaskExecutor
. I’ll comment on that issue.