Nate Atkins
05/17/2020, 1:22 AMtarget
parameter on tasks. This in conjunction with the addition of the upstream edges
on the trigger signature almost got me the ability to rebuild the current task target if the upstream was updated. I did a little trickery if the current task target needs to be rebuilt, I delete the target cache file and then raise a RETRY signal. When the task retries it can't find the cache file runs the task.
The only problem I have is that if the upstream task didn't run and update, and the current task doesn't need to run - what do I raise/return from the trigger to get the task to use the cached results?
True: The task will run
False: The flow will fail
SUCCESS: No cached results to pass on to the next task.Chris White
result
attribute you can simplify your code with the exists
interface; moreover, if you find exists
is True
, you can use the result
to read()
from the target location and gather the data.
- finally, all Prefect Signals are capable of carrying result information; so in this case, you might do (extreme psuedocode):
for edge, state in upstream_states.items():
data = state._result.read(target_location)
...
raise SUCCESS(result=data)
Nate Atkins
05/17/2020, 1:59 PM# Use the cached file.
with downstream_target.open("rb") as fp:
data = cloudpickle.load(fp)
raise SUCCESS(result=data)
It would have taken my quite a while to find that solution.
target
really simplified this. Great addition. Now I think I have the make
semantics I've been searching for over the last few weeks. Now to try it out in some real flows.Nate Atkins
05/17/2020, 2:08 PM# Use the cached file.
result = edge.downstream_task.result
data = result.read(result.location)
raise SUCCESS(result=data)
Brad
05/17/2020, 9:36 PMNate Atkins
05/17/2020, 11:59 PMtrigger
leverages an extension I made to the Result
to make results comparable. My LocalJsonResult
implements __lt__
so the trigger can easily see if a cache is out of date and know that it needs to run the task that builds it.
The outstanding piece I have is if a task generates multiple targets. I don't see anything that keeps me from passing a list of filenames as target
as long as my trigger
and result
are smart enough to deal with a list of files.
Let me know what you think about this. I still keep thinking that I'm somewhere in the weeds on my approach, but remember that data science workflows tend to me more incremental than an ETL workflow. I at least feel better that I've gotten all the weird negative engineering stuff out in just a couple of dozen lines of code int the trigger
and result
. It was all a giant mess in my tasks or a bunch of extra tasks.