https://prefect.io logo
Title
o

oliland

09/07/2019, 2:46 PM
One feature I'm interested in understanding better is how to manage data lineage / files downloaded from the internet. Let's say I have a versioned bucket on S3, and I want tasks to run if 1) the version hasn't been run before or 2) downstream tasks haven't succeeded for that data version before. I wonder if this is achievable in prefect (I understand it's evolved from Airflow which relies on execution dates for managing state)
j

Jeremiah

09/07/2019, 2:51 PM
Very interesting use case. There are a couple ways we could tackle this. One approach might be to adapt Prefect’s caching mechanism (https://docs.prefect.io/guide/examples/cached_task.html) to cache task results. One idea is that you’d have a task in your flow who returned the current bucket version, and you’d create a
cache_validator
that invalidated the cache whenever that version changed. Then you’d run the flow on a regular schedule and as long as the cache was valid, all tasks would simply skip
Another would be to use an external state (possibly the s3 bucket itself!) to record the version of the last successful run, and have a task at the start of your flow check that version against the current version. If they match, the task could
raise prefect.signals.Skip()
and skip the workflow
But what I’d like to recommend is something that we’re going to be taking up probably next quarter, which is event-driven flows. The flow could fire whenever the version changed (or a file hit S3, or any observable pattern). We’ve written a proposal here: https://docs.prefect.io/guide/PINs/PIN-08-Listener-Flows.html
When @Chris White is available he’ll probably have a stronger opinion about which one of these is better
o

oliland

09/07/2019, 2:57 PM
There are definitely trade-offs to be made here. For example, let's say you receive a new data version, but one of the downstream tasks fails when processing it. Now you're stuck with a workflow that will always skip, as the download succeeded but the rest of the workflow didn't.
j

Jeremiah

09/07/2019, 2:59 PM
The nice thing about the caching approach is it only caches successful runs
So you can run the flow over and over and never repeat a success
o

oliland

09/07/2019, 3:00 PM
Oh! I was not expecting that - definitely makes things more interesting...
j

Jeremiah

09/07/2019, 3:00 PM
Yeah, it’s a very cool mechanism
Chris is the expert on it though so I don’t want to run too far ahead 😉
o

oliland

09/07/2019, 5:15 PM
One way I've seen this done in other systems is by making "assets" a first class citizen - tasks accept assets as parameters and can return other assets. With assets tasks are never "skipped" per se, instead they just keep a cache of
assets[input_checksums] = output_checksums_and_paths
and can use cached assets where available.
I guess this is more useful for compiling files and doesn't translate well into the world of databases. But prefect seems modular enough to support assets as parameters in theory
j

Jeremiah

09/07/2019, 5:21 PM
Yup, I’ve seen systems that try to do this — usually, it’s systems that don’t support dataflow except under tightly controlled circumstances. One of our goals with Prefect was to let users build whatever system they required. In this case, Prefect will let you pass any Python object between any tasks, so you could reimplement what you’ve described with your own
Asset
class that your tasks consume and produce. You could even write a
Task
subclass that automatically checks any
Asset
inputs and raises a signal appropriately, so you wouldn’t even need the boilerplate.
In fact I’d accept a PR for that 😉
Ah, with a caveat — you’d need to store the assets in a known place and we try to avoid forced state in Prefect. We’d probably try to reuse the cache mechanism
o

oliland

09/07/2019, 5:27 PM
Indeed the code seems flexible enough to allow this - with the exception of the cache mechanism. It's a little inflexible right now (you have to provide a timedelta to use it - not really applicable in this context)
j

Jeremiah

09/07/2019, 5:32 PM
Good point, the default caching mechanism is principally time-based. In truth I don’t think there’s actually anything preventing an unlimited cache with a custom cache validator, it’s just not a use case that’s come up much! You’d need to raise a Cached state yourself instead of relying on
cache_for
to do it for you. 🤔 there’s something interesting here
I don’t want to drag you too far into the guts of Prefect on day 1 😉
o

oliland

09/07/2019, 5:35 PM
Thanks for the help! I'm curious to explore this further - will do some digging and share any insights
j

Jeremiah

09/07/2019, 5:36 PM
Awesome, we’d love to hear your conclusions.
o

oliland

09/07/2019, 5:40 PM
you’d need to store the assets in a known place and we try to avoid forced state in Prefect
I think if you're using assets you'd be effectively embracing some state. So every initialised asset would require a path, and a way of checksumming itself
j

Jeremiah

09/07/2019, 5:40 PM
We have a concept called
ResultHandlers
that could be reused here. Basically they are ways to interface Prefect with a known stateful location. As I mentioned we try not to use forced state within the engine, but this is the exception that comes to mind
ResultHandlers are used, for example, to take large dataframes and serialize them to S3 so that we have efficient ways to store references to data
They’re simple classes, just read / write
o

oliland

09/07/2019, 5:42 PM
Great! Will check them out.
c

Chris White

09/07/2019, 6:10 PM
This might be overly simplistic, but it sounds like you want to identify the version of your s3 bucket with a specific Flow Run (if the terminal tasks don’t succeed, that version hasn’t succeeded), which is precisely the pattern that Parameters were designed for - and if you wanted some extra caching logic, you could use an
all_parameters
cache validator on any relevant tasks to cache based on the value of this Flow Parameter