Hi everyone, I just realized something about `merg...
# ask-community
e
Hi everyone, I just realized something about
merge
, and I find it kind of unexpected. Here goes: Usually, when using something like
ifelse
and
merge
, the flow goes like:
Copy code
skip_if_true -> actual_task_for_false -> merge
skip_if_false -> actual_task_for_true -> merge
This works as expected. However I tried to directly merge tasks that can or cannot
SKIP
:
Copy code
skippable_1 = do_i_skip(skip=True)
skippable_2 = do_i_skip(skip=False)
merge(skippable_1, skippable_2)
merge
results in the
SKIP
signal raised in skippable_1, rather than skippable_2. That is unexpected, at least for me. Is this expected behavior? Can I somehow connect these explicit
SKIP
ping tasks with a merge directly?
k
Hi @emre! If I’m understanding correctly, there is a difference in that the first snippet shows mutually exclusive events, but
skippable_1
and
skippable_2
can both happen?
e
Logically, no. Exactly one of my tasks should not raise
SKIP
, in my use case.
k
Could I see a code snippet of what your Flow code would look like there? Do you use the
case
?
e
So the
Merge
task backing the
merge
statement receives upstream tasks in order. Iterates these task results in order, and returns the first task that hasn't returned
None
. If a task raises
SKIP
explicitly and is directly upstream to
merge
,
Merge
task receives the raised
SKIP
signal object. If a task is skipped due to an upstream task skipping,
Merge
receives
None
Copy code
with Flow("abc") as f:
    some_string = "abc"
    x = definitely_do_something(some_string)
    y = maybe_do_something(x)
    merge(y, x)
essentially I want to raise SKIP if i dont want to
maybe_do_something
, I think that will help me when monitoring runs on the UI.
k
I am wondering why you need the
merge
in this case? Seems like defining two independent tasks and then connecting both to a downstream task with a configured trigger (
any_upstream
) is the way to do this?
Cuz I feel it doesn’t seem like
if-else
semantics here that create the different branches.
e
I have 2 minor issues with that: • I actually need 3 cases, and it might increase. I will need to update my downstream tasks signatures every time I add new cases, and handle which to use in the task. • I will have unrelated upstream tasks for my downstreams. I will have to run the task even if those fail.
Whether
maybe_do_somthing
skips depends on value of x, lets say. Maybe I should just use
switch
😅
Anyways, I think we are getting sidetracked 😅 . My main issue was that it was weird for me, that if direct parents of
merge
skip, It doesn't work. But direct parent skips due to an upstream skip, and merge works.
k
Yeah it does seem unintuitive, but what I’m not sure is if that’s a misuse of the
merge
. I’ll be looking into this, and it seems like either way we’ll do something around docs or code.
j
This is an interesting problem @emre. I think it highlights some awkwardness in deferred conditional graphs. A couple thoughts that come to mind: •
merge
is principally for merging the output of conditionally-executed tasks. I think your explicit
SKIP
raise is erroneously being treated as the output, and might even be considered a bug if so (I’m not 100% sure I”m tracking the problem statement) • Based on your description, I think you can modify @Kevin Kho’s trigger-based solution slightly to achieve your purpose. Note I’m assuming that you don’t actually need the output of your merged tasks, you’re just trying to set up state dependencies between them and the downstream path: ◦ Use any task (could be a custom
merge
, could be just
prefect.Task()
) with a default
all_successful
trigger and
skip_on_upstream_skip=False
(note this is the setup for the builtin
Merge
task) ◦ Set that task as the state-based downstream of all your conditional tasks. You can do this one-by-one with
task.set_upstream()
or in one swoop with
task.set_dependencies(upstream_tasks=[…])
to capture your possibly-increasing number of upstreams ◦ I think this will capture the behavior you need: execution will block until all conditional branches have either skipped or succeeded. At this time you can move on to your downstream tasks. • My solution above does not take output into account, and I’m guessing that’s your objective. If you do need to actually merge the outputs, then your observation about the SKIP signal object might in fact be a bug.
e
Hey @Jeremiah. I eventually gave in and converted my solution to use a
switch
. So I don't really need help for my scenario, but I will explain in case anyone is curious 😅 Before, I had multiple (currently 3) tasks that conditionally skipped based on the same input. Their conditions, and having the same input, ensured that only one of them ran and all others skipped. So being the greedy perfectionist that I am, I wanted to avoid having those
CompareValue
tasks the switch would generate. I went ahead and put the conditional logic into each task, and skipped based on the condition. Also, all of these are actually mapped tasks, and I really get confused when
switch
and mapping get mixed up 😅 . Now,i I'm liking the
switch
version better though. All my conditional logic is centralized in the switch condition, my branch tasks are simpler, and its easy to derive that only one of those tasks is going to run by simply taking a glance at the flow chart.
k
That’s good to know! @emre
e
As for merges behavior, it does feel weird. Still, it exists under the
control_flow
module, so can't really be angry about it not working out of scope 😅