https://prefect.io logo
s

Sam Peck

02/04/2021, 12:12 AM
Hi gang, I’m having a confusing time with dealing with inconsistent behavior between how failed results interact with
.map
. When I run a flow like this:
Copy code
with Flow(f"test flow|) as flow:
  values = [1,2,3]
  task_1_results = task1.map(values)
  task_2_results = task2.map(task_1_results)
Any task1 results that raised a
FAIL
signal don’t get run by task2, instead task2 aborts early with
Finished task run for task with final state: 'TriggerFailed'
However when I change this slightly to introduce a filter of some kind:
Copy code
my_filter = FilterTask()

with Flow(f"test flow|) as flow:
  values = [1,2,3]
  task_1_results = my_filter(task1.map(values))
  task_2_results = task2.map(task_1_results)
All task1 results get mapped onto task2, including those that failed. I get the sense that I’ve misunderstood the functional API and that’s what’s tripping me up, however I haven’t found anything in the docs that unlocks my intuition about why it would behave this way and how to modify that behavior. My first thought was to modify the filter to look out for failed results, but from stepping through in my debugger it doesn’t look like the trigger function gets any of that context.
c

Chris White

02/04/2021, 12:40 AM
Hi @Sam Peck - the behavior here isn’t so much about mapping but rather triggers; triggers are attached to individual tasks and encapsulate logic for whether that task should be allowed to run, given all of its upstream states. The default trigger in Prefect is
all_successful
which means that if any upstream task run is not both finished and successful, the downstream task will enter a
TriggerFailed
state as you’re seeing here. The
FilterTask
is special in that it uses an
all_finished
trigger, which allows it to run regardless of whether the upstreams succeeded or not (as long as they finished). This allows the filter task to filter out exceptions with no special config. I’m guessing here, but you might be looking to override the trigger on
task2
so that it still runs if its upstreams fail
s

Sam Peck

02/04/2021, 1:19 AM
Thanks for the detailed response @Chris White! I’m using the filters to act as the conditional logic within my workflow. The way I’m thinking about my “separation of concerns” strategy is that task failures are best handled by Prefect’s built in
states
and
signals
functionality--but I’d like to filter successful runs for downstream mapped tasks. (Side note: LMK if this is considered a poor practice with prefect--this is my first non-trivial flow so I’m figuring this out as I go) With the above architecture in mind I’ve updated my filter with
trigger=all_successful
. This seems to have the desired affect of only routing successful tasks to the filter. That said…I’m not certain from the documentation how
upstream_states
interacts with this trigger when I’m using
.map
. My guess would be that when I set
all successful
on task within a flow with multiple mapped tasks--the
all
being evaluated is the path from the current node in the DAG to the first node in the DAG even through any fan out/fan in operations involved in handling mapped tasks. Am I thinking about this correctly?
c

Chris White

02/04/2021, 2:57 PM
Hey @Sam Peck - apologies for not seeing this last night! It does sound like you’re thinking about it correctly - when using
.map
multiple times as you are doing here, each downstream mapped task will only evaluate it’s trigger on the immediate upstream from the previous mapped level. So the rules are: • if you have a “reduce” task, the trigger will be evaluated on all upstream states, including individual mapped tasks (in your example, the filter task is a reduce task) • if you have multiple levels of
.map
calls, the trigger will be evaluated only on the immediate upstreams so that other branches of the mapped pipeline will run independently
🙌 1
👍 1
3 Views