Ohhhhh so close. I was working with the new `tar...
# prefect-community
n
Ohhhhh so close. I was working with the new
target
parameter on tasks. This in conjunction with the addition of the upstream
edges
on the trigger signature almost got me the ability to rebuild the current task target if the upstream was updated. I did a little trickery if the current task target needs to be rebuilt, I delete the target cache file and then raise a RETRY signal. When the task retries it can't find the cache file runs the task. The only problem I have is that if the upstream task didn't run and update, and the current task doesn't need to run - what do I raise/return from the trigger to get the task to use the cached results? True: The task will run False: The flow will fail SUCCESS: No cached results to pass on to the next task.
πŸ‘€ 2
πŸ‘ 2
c
Hey Nate! Thanks for sharing β€” this seems like a cool pattern! Two things: - if you use the downstream task’s
result
attribute you can simplify your code with the
exists
interface; moreover, if you find
exists
is
True
, you can use the
result
to
read()
from the target location and gather the data. - finally, all Prefect Signals are capable of carrying result information; so in this case, you might do (extreme psuedocode):
Copy code
for edge, state in upstream_states.items():
    data = state._result.read(target_location)
...
raise SUCCESS(result=data)
n
Thanks Chris, Modified the return at the end to use the cached data.
Copy code
# Use the cached file. 
with downstream_target.open("rb") as fp:
    data = cloudpickle.load(fp)
raise SUCCESS(result=data)
It would have taken my quite a while to find that solution.
target
really simplified this. Great addition. Now I think I have the
make
semantics I've been searching for over the last few weeks. Now to try it out in some real flows.
πŸš€ 1
event better.
Copy code
# Use the cached file.
result = edge.downstream_task.result
data = result.read(result.location)
raise SUCCESS(result=data)
πŸ’― 1
b
Hey @Nate Atkins would you mind sharing your final implementation?
n
Phew, that's been a productive couple of days where I learned a lot about how Prefect is working. The
trigger
leverages an extension I made to the
Result
to make results comparable. My
LocalJsonResult
implements
__lt__
so the trigger can easily see if a cache is out of date and know that it needs to run the task that builds it. The outstanding piece I have is if a task generates multiple targets. I don't see anything that keeps me from passing a list of filenames as
target
as long as my
trigger
and
result
are smart enough to deal with a list of files. Let me know what you think about this. I still keep thinking that I'm somewhere in the weeds on my approach, but remember that data science workflows tend to me more incremental than an ETL workflow. I at least feel better that I've gotten all the weird negative engineering stuff out in just a couple of dozen lines of code int the
trigger
and
result
. It was all a giant mess in my tasks or a bunch of extra tasks.