https://prefect.io logo
#prefect-community
Title
# prefect-community
c

Constantino Schillebeeckx

04/12/2022, 1:55 PM
I'm having issue with multiple layers of mapped tasks running out of expected order 🧵
discourse 1
Copy code
from prefect import task, Flow
import time
from prefect.executors import LocalDaskExecutor


@task
def m1(x):
    time.sleep(x)
    print(x)


@task
def m2(x):
    print(x)


with Flow('foo') as f:
    l1 = m1.map([1, 2, 3, 4, 5])
    l2 = m2.map(['a', 'b'], upstream_tasks=[l1])

f.executor = LocalDaskExecutor(scheduler="processes", num_workers=7)
For the example above, I would like (and would expect) all children of the mapped task
m1
to be executed first before
m2
begins (since it's upstream_task) - however that's not what I'm seeing (see screenshot)
Ah, I think I found the issue (this post helped) - needed to
unmapped
the upstream_tasks
Copy code
from prefect import task, Flow
import time
from prefect.executors import LocalDaskExecutor
from prefect import unmapped


@task
def m1(x):
    time.sleep(x)
    print(x)


@task
def m2(x):
    print(x)


@task
def red(x):
    print('dummy reduce')


with Flow('foo') as f:
    l1 = m1.map([1, 2, 3, 4, 5])
    l2 = m2.map(['a', 'b'], upstream_tasks=[unmapped(l1)])

f.executor = LocalDaskExecutor(scheduler="processes", num_workers=7)
a

Anna Geller

04/12/2022, 2:30 PM
Nice work figuring this out and thanks for sharing the solution!
6 Views