https://prefect.io logo
Title
j

Jiri Klein

08/26/2022, 3:31 PM
Hi, I’m currently having some struggles with Mapped Tasks and
LocalDaskExecutor
in V1 - I have two sets of mapped tasks, each set executing in parallel. Is there a way of informing Prefect to wait for 1 set to fully finish? I tried Task triggers with
all_successful
flag, but to no avail, the second set of mapped tasks fails with
TriggerFailed
. Python pseudocode sample would be e.g.
from prefect.triggers import all_successful
from prefect import task, Flow


@task(name="Task A")
def task_a(input: Any):
    return embarrassingly_parallel_func_1()

@task(name="Task B", trigger=all_successful)
def task_b(input: Any):
    return embarrassingly_parallel_func_2(input)
    
with Flow() as flow:
    flow.executor = "LocalDaskExecutor"
    _coll = [1, 2, 3]

    _a = task_a.map(input=_coll)
    _b = task_b.map(input=_a, upstream_tasks=[task_a])
Unfortunately, for me some child tasks from
task_b.map
either begin executing BEFORE all
task_a.map
are finished OR they fail with
TriggerFailed
Does anyone have any experience on this?
1️⃣ 1
1
n

Nate

08/29/2022, 7:46 PM
Hi @Jiri Klein, here's a functional example of how you could prevent
task_b
from beginning its mapped tasks until all
task_a
results are materialized
from prefect import task, Flow
from prefect.tasks.control_flow.filter import FilterTask
from prefect.executors import LocalDaskExecutor
from prefect.engine.signals import SKIP
from time import sleep


assert_results = FilterTask(
    filter_func=lambda x: not isinstance(x, (BaseException, SKIP, type(None))),
    name="Filter Mapped Results",
)

@task(name="Task A", log_stdout=True)
def task_a(x: int):
    print(f'starting task a on element {x-1} of `_coll`')
    if x == 2:
        sleep(2)
    return x

@task(name="Task B", log_stdout=True)
def task_b(x: int):
    print(f'starting task b on element {x-1} of `_coll`')
    
with Flow('Test order', executor=LocalDaskExecutor()) as flow:
    _coll = [2, 1, 3]

    _a = task_a.map(_coll)
    
    asserted_a_results= assert_results(_a)
    
    _b = task_b.map(asserted_a_results)

if __name__ == "__main__":
    flow.run()
the main point here is to use the
FilterTask
as a checkpoint to enforce that all mapped children of
task_a
are complete without the FilterTask you'll see the behaviour you described, aka while the mapped child
task_a
that receives
x=2
is sleeping,
task_b
will start without waiting for the
task_a
result when x=2
just FYI, it's also recommended to set
flow
executors in the
Flow(...)
object
__init__
as I did above
j

Jiri Klein

09/01/2022, 9:05 AM
Fantastic, this works. Thank you @Nate
:party-parrot: 1
n

Nate

09/01/2022, 5:39 PM
sure thing 😄