Jiri Klein
08/26/2022, 3:31 PMLocalDaskExecutor
in V1 - I have two sets of mapped tasks, each set executing in parallel. Is there a way of informing Prefect to wait for 1 set to fully finish? I tried Task triggers with all_successful
flag, but to no avail, the second set of mapped tasks fails with TriggerFailed
.
Python pseudocode sample would be e.g.
from prefect.triggers import all_successful
from prefect import task, Flow
@task(name="Task A")
def task_a(input: Any):
return embarrassingly_parallel_func_1()
@task(name="Task B", trigger=all_successful)
def task_b(input: Any):
return embarrassingly_parallel_func_2(input)
with Flow() as flow:
flow.executor = "LocalDaskExecutor"
_coll = [1, 2, 3]
_a = task_a.map(input=_coll)
_b = task_b.map(input=_a, upstream_tasks=[task_a])
Unfortunately, for me some child tasks from task_b.map
either begin executing BEFORE all task_a.map
are finished OR they fail with TriggerFailed
Does anyone have any experience on this?Nate
08/29/2022, 7:46 PMtask_b
from beginning its mapped tasks until all task_a
results are materialized
from prefect import task, Flow
from prefect.tasks.control_flow.filter import FilterTask
from prefect.executors import LocalDaskExecutor
from prefect.engine.signals import SKIP
from time import sleep
assert_results = FilterTask(
filter_func=lambda x: not isinstance(x, (BaseException, SKIP, type(None))),
name="Filter Mapped Results",
)
@task(name="Task A", log_stdout=True)
def task_a(x: int):
print(f'starting task a on element {x-1} of `_coll`')
if x == 2:
sleep(2)
return x
@task(name="Task B", log_stdout=True)
def task_b(x: int):
print(f'starting task b on element {x-1} of `_coll`')
with Flow('Test order', executor=LocalDaskExecutor()) as flow:
_coll = [2, 1, 3]
_a = task_a.map(_coll)
asserted_a_results= assert_results(_a)
_b = task_b.map(asserted_a_results)
if __name__ == "__main__":
flow.run()
the main point here is to use the FilterTask
as a checkpoint to enforce that all mapped children of task_a
are complete
without the FilterTask you'll see the behaviour you described, aka while the mapped child task_a
that receives x=2
is sleeping, task_b
will start without waiting for the task_a
result when x=2flow
executors in the Flow(...)
object __init__
as I did aboveJiri Klein
09/01/2022, 9:05 AMNate
09/01/2022, 5:39 PM