Sam Peck02/04/2021, 12:12 AM
. When I run a flow like this:
Any task1 results that raised a
with Flow(f"test flow|) as flow: values = [1,2,3] task_1_results = task1.map(values) task_2_results = task2.map(task_1_results)
signal don’t get run by task2, instead task2 aborts early with
However when I change this slightly to introduce a filter of some kind:
Finished task run for task with final state: 'TriggerFailed'
All task1 results get mapped onto task2, including those that failed. I get the sense that I’ve misunderstood the functional API and that’s what’s tripping me up, however I haven’t found anything in the docs that unlocks my intuition about why it would behave this way and how to modify that behavior. My first thought was to modify the filter to look out for failed results, but from stepping through in my debugger it doesn’t look like the trigger function gets any of that context.
my_filter = FilterTask() with Flow(f"test flow|) as flow: values = [1,2,3] task_1_results = my_filter(task1.map(values)) task_2_results = task2.map(task_1_results)
which means that if any upstream task run is not both finished and successful, the downstream task will enter a
state as you’re seeing here. The
is special in that it uses an
trigger, which allows it to run regardless of whether the upstreams succeeded or not (as long as they finished). This allows the filter task to filter out exceptions with no special config. I’m guessing here, but you might be looking to override the trigger on
so that it still runs if its upstreams fail
Sam Peck02/04/2021, 1:19 AM
functionality--but I’d like to filter successful runs for downstream mapped tasks. (Side note: LMK if this is considered a poor practice with prefect--this is my first non-trivial flow so I’m figuring this out as I go) With the above architecture in mind I’ve updated my filter with
. This seems to have the desired affect of only routing successful tasks to the filter. That said…I’m not certain from the documentation how
interacts with this trigger when I’m using
. My guess would be that when I set
on task within a flow with multiple mapped tasks--the
being evaluated is the path from the current node in the DAG to the first node in the DAG even through any fan out/fan in operations involved in handling mapped tasks. Am I thinking about this correctly?
multiple times as you are doing here, each downstream mapped task will only evaluate it’s trigger on the immediate upstream from the previous mapped level. So the rules are: • if you have a “reduce” task, the trigger will be evaluated on all upstream states, including individual mapped tasks (in your example, the filter task is a reduce task) • if you have multiple levels of
calls, the trigger will be evaluated only on the immediate upstreams so that other branches of the mapped pipeline will run independently