Nate Atkins
05/17/2020, 1:22 AMtarget parameter on tasks. This in conjunction with the addition of the upstream edges on the trigger signature almost got me the ability to rebuild the current task target if the upstream was updated. I did a little trickery if the current task target needs to be rebuilt, I delete the target cache file and then raise a RETRY signal. When the task retries it can't find the cache file runs the task.
The only problem I have is that if the upstream task didn't run and update, and the current task doesn't need to run - what do I raise/return from the trigger to get the task to use the cached results?
True: The task will run
False: The flow will fail
SUCCESS: No cached results to pass on to the next task.Chris White
result attribute you can simplify your code with the exists interface; moreover, if you find exists is True, you can use the result to read() from the target location and gather the data.
- finally, all Prefect Signals are capable of carrying result information; so in this case, you might do (extreme psuedocode):
for edge, state in upstream_states.items():
data = state._result.read(target_location)
...
raise SUCCESS(result=data)Nate Atkins
05/17/2020, 1:59 PM# Use the cached file.
with downstream_target.open("rb") as fp:
data = cloudpickle.load(fp)
raise SUCCESS(result=data)
It would have taken my quite a while to find that solution.
target really simplified this. Great addition. Now I think I have the make semantics I've been searching for over the last few weeks. Now to try it out in some real flows.Nate Atkins
05/17/2020, 2:08 PM# Use the cached file.
result = edge.downstream_task.result
data = result.read(result.location)
raise SUCCESS(result=data)Brad
05/17/2020, 9:36 PMNate Atkins
05/17/2020, 11:59 PMtrigger leverages an extension I made to the Result to make results comparable. My LocalJsonResult implements __lt__ so the trigger can easily see if a cache is out of date and know that it needs to run the task that builds it.
The outstanding piece I have is if a task generates multiple targets. I don't see anything that keeps me from passing a list of filenames as target as long as my trigger and result are smart enough to deal with a list of files.
Let me know what you think about this. I still keep thinking that I'm somewhere in the weeds on my approach, but remember that data science workflows tend to me more incremental than an ETL workflow. I at least feel better that I've gotten all the weird negative engineering stuff out in just a couple of dozen lines of code int the trigger and result . It was all a giant mess in my tasks or a bunch of extra tasks.