https://prefect.io logo
Title
b

Brad

04/23/2020, 11:12 PM
I do have one request actually related to caching - is there plan for first-class support for exceptions ?
From my perspective, if a task fails, I would love to be able to cache that exception and not try and rerun that task (with the same parameters) until say, I’ve pushed a new version of my code or manually cleared that cache
d

Dylan

04/23/2020, 11:12 PM
hmmm
b

Brad

04/23/2020, 11:13 PM
The ability to push a code change, query an “exceptions”
Result
like table for example, and rerun only those tasks/flows would be very powerful
d

Dylan

04/23/2020, 11:14 PM
So, if a flow run fails on a particular task run, a subsequent run wouldn’t attempt to run that particular task again until a change is made?
👍 1
b

Brad

04/23/2020, 11:14 PM
But some computations are long or costly, if I have a flow and rerun it, given the deterministic nature of tasks, I wouldn’t want to rerun tasks that have throw exceptions in the past, under the same version of the code
d

Dylan

04/23/2020, 11:15 PM
Interesting
I don’t think we have a concept for this now, AFAIK a task has to succeed for a result to cache
b

Brad

04/23/2020, 11:16 PM
yep, I think this is the case
d

Dylan

04/23/2020, 11:16 PM
But it’s a very interesting feature request
I’ll pass it along to the team!
b

Brad

04/23/2020, 11:16 PM
thanks @Dylan
d

Dylan

04/23/2020, 11:16 PM
Actually
Would you mind opening an issue with some additional details about why you’re looking to cache the result?
b

Brad

04/23/2020, 11:17 PM
Will do
d

Dylan

04/23/2020, 11:17 PM
We’re in the process of implementing a new
Result
interface, which you can read about here: https://docs.prefect.io/core/PINs/PIN-16-Results-and-Targets.html#pin-16-results-and-targets
I’m wondering if
Exception
would be an interesting result type
d

David Ojeda

04/24/2020, 3:04 PM
We have an approach for failed tasks that succeed (yes, it sounds weird) for cases where we know the incoming data may raise a particular exception and will always fail but we don’t want the entire flow to fail. For example, when doing a map over files, we want to continue even if some files could not be processed due to a format error. To achieve this, we have a new state and signal:
class GracefulFail(Success):  class
    color = '#dd1c77'

class GRACEFULFAIL(PrefectStateSignal):
    _state_cls = GracefulFail
and we have our task do
raise GRACEFULFAIL('what happened')
However, downstream tasks need to be able to handle an upstream task that failed “gracefully”. It is possible to have the set some default results to the prefect state, so we put our “default results” on it and raise the signal.
b

Brad

04/24/2020, 11:46 PM
@David Ojeda interesting thoughts - I still like the idea of tasks emitting the proper Failed task, as two different downstream tasks may want to handle the error in different ways - it should be up to the trigger of the downstream task to decide what to do.
As a simple example, suppose I have a task that computes a moving average of something over some parametrised number of days - As you mention, I may not want my task to fail if I have a single broken day of input data if my number of days is say, 30, but I may want it to fail if I am doing a two day moving average. So implementing this in the downstream task makes sense to me (say with some percentage threshold for the upstream tasks passing)
d

David Ojeda

04/25/2020, 10:16 AM
I guess it all depends on what works for you and your team. On our team, we decided to make the difference between “soft” and “hard” fails. Soft fails is for cases where we encounter a known problem that should not stop the whole flow. This is handled with the state I posted before. Hard fails is for cases where there is an unexpected problem (valueerrors, runtimeerrors, or something that we might have overlooked in our implementation). We use it also for checking postconditions of our tasks: if the postcondition is not met, downstream tasks may fail (e.g. the output follows a certain standard, or is not none, etc) We also have a custom task, that derives
prefect.Task
that wraps the
run
method so we can raise expected (related to soft fail) or unexpected (related to hard fails) exceptions. This custom task catches the expected exceptions and wraps them in a state and endrun signal. In our team, we have an example similar to your example. It is a flow that is like this:
with Flow('example') as flow:
    files = query()
    clean_files = clean.map(file=files)
    features = process.map(file=clean_files)
    summary = gather(features)
The
clean
task takes a file, extracts a signal and does some filtering. It can happen that the signal is saturated. When >10% of the samples are saturated, we soft fail. In that case, we raise a soft fail with an “empty” result that is usually an empty dataframe; the
process
and
gather
tasks are coded to handle dataframes as inputs and can either soft fail with an empty input or just generate an empty output because they are designed to work with empty inputs as well. On the other side, when
clean
encounters a corrupted file, it will hard fail. This will stop the flow execution after the
clean
dag node and will result in some very visible status in the UI or in our logs. This is what we want: we will either remove this file from our data source, or remove it from the query. In some cases,
clean
will encounter an unexpected exception, due to human-error when coding that function. This is also something that we want to be very visible so we can fix it as soon as possible.