https://prefect.io logo
Title
b

Brian McFeeley

08/01/2019, 7:27 PM
I have a bit of a philosophical question about chaining mapped tasks to produce parallel pipelines. Let's imagine a simple flow of tasks like:
items = get_list(...)
t1 = transformation.map(items)
t2 = transformation2.map(t1)
...
say we expect due to some data noise that there's a low but nonzero failure chance of one of the steps late in the series of transformations, and only for certain subsets of the data. from a business logic perspective, the thing we'd like to happen is: every success happens as it happens, and individual task failures don't torpedo the whole pipeline, we just do some failure handling (maybe quarantining the input data in a separate bucket in s3 for analysis & notifying the data engineers, for example). The run is still a "success" in business terms if we get a large majority/plurality of tasks completed, but we do want to act on the failures by either creating new test cases or finding better filters for dirty data. i've seen docs on state transition handling, but i'm wondering how to use that in the context of a larger flow to both 1. get a batch-wide view of the incident frequency of error transitions and 2. not tank the whole pipeline as failed, unless maybe we exceed some configurable threshold of error rate. Does this run counter to y'alls programming model?
👀 2
maybe the failure handler could emit a metric to one of our monitoring solutions and use their more native counting solutions, instead of demanding that of the pipeline code.
b

Brett Papineau

08/01/2019, 8:35 PM
This is something we are looking at too. We are elevating every piece of data possible from a single prefect flow.run() into the data warehouse. This should allow us to take action using any appropriate method in the future. We don't want flows to fail either unless absolutely necessary.
b

Brian McFeeley

08/01/2019, 8:45 PM
Thanks for your $.02!
c

Chris White

08/02/2019, 1:08 PM
Hey @Brian McFeeley - first off this absolutely fits within our programming model. This might be worth a deeper dive but I can make a few bite-sized remarks here: - the mapped pipeline itself will not get mucked up based on the failure of a single task within the pipeline; of course anything downstream of that particular task won’t run with default settings - I think at this exact moment state transition handling might be the way to cache inputs upon failure; this sounds like there might be a new feature lurking in here that we could work together on, but off-the-shelf I’d recommend a
try / except
block which, if an error is caught, raises a Prefect
FAIL
signal with the inputs as the “result” so your state handler can ship them off somewhere - going back to “default settings” -> I think playing around with triggers might be useful here; for example, we have some
some_failed
type-triggers (https://docs.prefect.io/api/unreleased/triggers.html#prefect-triggers-some-failed) that allow you to only run downstream tasks if a certain proportion of upstreams fail / succeed. - similar to the above point, you could also have a terminal task with the appropriate
some_failed
trigger which will determine the flow state appropriately. This also sounds like https://github.com/PrefectHQ/prefect/issues/775 could help you as well
b

Brian McFeeley

08/02/2019, 3:26 PM
This is really great stuff! Will start cobbling it together soon
💯 2
j

Jeremiah

08/02/2019, 5:58 PM
To echo - not only is all of that in our programming model, if we aren't making it easy then we will.
🚀 1
c

Chris White

08/05/2019, 9:58 PM
@Marvin archive “Large scale mapping where some failures are allowed”