Aric Huang
02/19/2022, 1:38 AMmap
- With the following sample flow, I was expecting the mapped task f
to run concurrently with wait
because there are no dependencies and LocalDaskExecutor
is being used. However, the behavior I see is that only `wait`'s mapped tasks get executed, so f
is not executed until all the wait
tasks return.
from prefect import Flow, task
import time
from prefect.executors import LocalDaskExecutor
@task
def f(x):
return x*2
@task
def wait(x):
time.sleep(x)
with Flow("test") as flow:
a = list(range(4))
wait.map(a)
result = f.map(a)
flow.executor = LocalDaskExecutor()
wait[i]
tasks all getting executed first and f[i]
not running until all the `wait`s are complete
Retrieving local flow... Done
Running flow locally...
└── 01:37:45 | INFO | Beginning Flow run for 'test'
└── 01:37:45 | INFO | Task 'wait': Starting task run...
└── 01:37:45 | INFO | Task 'wait': Finished task run for task with final state: 'Mapped'
└── 01:37:45 | INFO | Task 'f': Starting task run...
└── 01:37:45 | INFO | Task 'f': Finished task run for task with final state: 'Mapped'
└── 01:37:45 | INFO | Task 'wait[0]': Starting task run...
└── 01:37:45 | INFO | Task 'wait[1]': Starting task run...
└── 01:37:45 | INFO | Task 'wait[3]': Starting task run...
└── 01:37:45 | INFO | Task 'wait[0]': Finished task run for task with final state: 'Success'
└── 01:37:45 | INFO | Task 'wait[2]': Starting task run...
└── 01:37:46 | INFO | Task 'wait[1]': Finished task run for task with final state: 'Success'
└── 01:37:47 | INFO | Task 'wait[2]': Finished task run for task with final state: 'Success'
└── 01:37:48 | INFO | Task 'wait[3]': Finished task run for task with final state: 'Success'
└── 01:37:48 | INFO | Task 'f[0]': Starting task run...
└── 01:37:48 | INFO | Task 'f[2]': Starting task run...
└── 01:37:48 | INFO | Task 'f[2]': Finished task run for task with final state: 'Success'
└── 01:37:48 | INFO | Task 'f[0]': Finished task run for task with final state: 'Success'
└── 01:37:48 | INFO | Task 'f[3]': Starting task run...
└── 01:37:48 | INFO | Task 'f[1]': Starting task run...
└── 01:37:48 | INFO | Task 'f[3]': Finished task run for task with final state: 'Success'
└── 01:37:48 | INFO | Task 'f[1]': Finished task run for task with final state: 'Success'
└── 01:37:48 | INFO | Flow run SUCCESS: all reference tasks succeeded
map
? I tried adjusting num_workers
for LocalDaskExecutor
but behavior stayed the sameKevin Kho
DaskExecutor
prefers depth first while LocalDaskExecutor
can have breadth firstAric Huang
02/19/2022, 4:36 AMwait
tasks, it would be able to start running some f
tasks without having to wait. But varying num_workers
had no effect. So I must be missing somethingKevin Kho
Aric Huang
02/19/2022, 4:47 AM