Constantino Schillebeeckx
04/12/2022, 1:55 PMfrom prefect import task, Flow
import time
from prefect.executors import LocalDaskExecutor
@task
def m1(x):
time.sleep(x)
print(x)
@task
def m2(x):
print(x)
with Flow('foo') as f:
l1 = m1.map([1, 2, 3, 4, 5])
l2 = m2.map(['a', 'b'], upstream_tasks=[l1])
f.executor = LocalDaskExecutor(scheduler="processes", num_workers=7)
For the example above, I would like (and would expect) all children of the mapped task m1
to be executed first before m2
begins (since it's upstream_task) - however that's not what I'm seeing (see screenshot)unmapped
the upstream_tasks
from prefect import task, Flow
import time
from prefect.executors import LocalDaskExecutor
from prefect import unmapped
@task
def m1(x):
time.sleep(x)
print(x)
@task
def m2(x):
print(x)
@task
def red(x):
print('dummy reduce')
with Flow('foo') as f:
l1 = m1.map([1, 2, 3, 4, 5])
l2 = m2.map(['a', 'b'], upstream_tasks=[unmapped(l1)])
f.executor = LocalDaskExecutor(scheduler="processes", num_workers=7)
Anna Geller
04/12/2022, 2:30 PM