Two undependent `map` tasks are not parallel。 For ...
# ask-community
u
Two undependent
map
tasks are not parallel。 For the code, see reply.
times
and
times2
are parallel, but then
sleep
and
sleep2
did not,Is there a limit to the implementation of
map
? There are application scenarios, such as a task operating different types of devices, each type of device has two serial tasks, but the number of devices is determined by parameters, such as (Of course, this is pure python serial.):
Copy code
ret = []
for device in params.get('device_type1_list'):
    result = operate_device_type1_step2(operate_device_type1_step1(device))
    ret.append(result)
for device in params.get('device_type2_list'):
    result = operate_device_type2_step2(operate_device_type2_step1(device))
    ret.append(result)
# do something for ret
handle(ret)
Here, each device can be parallel, but the
steps
are serial. So is there a solution to this situation?
k
Hey, I think there are two different scenarios here. One of them, Prefect can currently support and the other not. So first, the one that Prefect can’t support with the sleep. You get an execution like the one below. So if you have two independent mapped task, you are right that they execute one at a time. This is recorded in this issue (and please feel free to comment there).
The second one which you describe I think is a bit different. If you have a list of items 1,2,3 and each needs three tasks A,B,C. If you do:
Copy code
with Flow...
     a = A.map([1,2,3])
     b = B.map(a)
     c = C.map(b)
This will run in a depth first execution manner for the
DaskExecutor
. For
LocalDaskExecutor
, you can sometimes see breadth first execution, but this is something you can’t enforce in Dask as Dask is the one making decisions which tasks to run. So for the second example you describe, I think you can combine
device_type1_list
and
device_type2_list
and then map over them like above?
Also, when you get the chance, could you please move the code to the thread so that we can keep the main channel cleaner? 😄
😄 1
u
I think it may be the problem with the
LocalDaskExecutor
. When I switch to
DaskExecutor
, it will be fine.
the code is move here
Copy code
import os
import time

import prefect
from prefect import task, Flow, Parameter
from prefect.executors import LocalDaskExecutor

@task
def times():
    return [1, 5, 10]

@task
def times2():
    return [2, 6, 12]

@task(log_stdout=True)
def sleep(x):
    time.sleep(x)
    return x

@task
def sleep2(x):
    time.sleep(x)
    return x

with Flow("dfe2") as flow:
    sleep.map(sleep.map(times))
    sleep2.map(sleep2.map(times2))

flow.executor = LocalDaskExecutor()
flow.register(project_name="example")
k
Thanks for moving the code!
u
Although the switch to
DaskExecutor
was carried out as expected, there are still problems with
LocalDaskExecutor
. I’ll comment on that issue.
👍 1