Hey all! A question I couldn't find addressed in t...
# prefect-community
d
Hey all! A question I couldn't find addressed in the docs: is there a way to run a flow from a particular "entry point"? E.g. if you have a flow with sequential tasks A -> B -> C -> D, could you invoke the flow from task B or C? A concrete use case is a data processing pipeline where task A is time-consuming and most of the time doesn't need to be run.
I thought one approach could be to have a Parameter to define which task the flow should start from, and use signals to raise SUCCESS for each upstream task. But that would require adding a check to every task, which is a bit tedious for flows with many tasks
j
Hi @David Norrish -- Prefect has a caching mechanism for exactly this use case! You can indicate that a task's final state should be cached for a specific amount of time. This let's you skip expensive steps for every run.
We try to avoid the more general form (starting from an arbitrary task) because it introduces some major complexity in how users need to supply initial conditions (it was supported in older versions of Prefect but introduced more problems than it solved!)
d
Ah thanks! That's good to know
With the caching mechanism, what about if you change the code for your flow? Would you need to manually invalidate the cache, or can Prefect somehow cleverly detect that this has happened?
j
The cache is invalidated if it expires or if certain conditions aren’t met (like parameters change). However it must be provided as an initial
CachedState
for the task in question when you run the flow, so the choice of how to persist and whether to provide it is up to you. If you want to forcibly invalidate it, you could simply run your flow with no initial states provided, and then the task would run as normal.
In other words after you run the flow, a cached task will return the Cached state instead of a Success state. If you pass that back in as the initial state for the next run, it’ll be used appropriately.
d
Thanks a bunch @Jeremiah! I just found the relevant section of the docs (https://docs.prefect.io/guide/core_concepts/execution.html#caching), so will have a play and see if I can get it working as needed. I hadn't come across the concept of an "initial state". So if I understand correctly, one would execute the
Flow
, and hold onto the
CachedState
. Then to run a second time and skip the cached steps, you would pass any `CachedState`s as a list to the
tasks
parameter of
flow.run()
?
(If this is correct, have you got any thoughts about persisting a
CachedState
between executions of a script? Would you pickle it to file or something?
j
Let me defer to @Chris White for a recommendation there. They are JSON serializable by design but this starts to involve result handlers to serialize the result itself, and as I’m afk I may not be able to fully explain
👍 1
d
Thanks!
c
Hey @David Norrish! so a few things: - to specify a set of initial states, you can pass a dictionary of
{task: state}
to
flow.run
under the
task_states
keyword - this kwarg is passed down to the flow runner itself and is documented here: https://docs.prefect.io/api/unreleased/engine/flow_runner.html#prefect-engine-flow-runner-flowrunner-run
- for automatically persisting the cached state across scripts, we currently don’t expose a first-class way of doing that using only the open source engine, but this has come up before so it’s very much on my mind; if you were on the Prefect Cloud platform, we have a concept of “cache_key” that allows persistence across flows / flow runs in a robust way
d
Thanks a bunch @Chris White 🙂 That's super handy to have explained. We're still in the "poking around and trying it out" stage, so not on Prefect Cloud yet, but that's definitely a possibility for down the line! Shall see how I go with this approach
c
yea that sounds good, and definitely let us know if the patterns outlined here don’t fit your use case!
d
Will do!
c
@Marvin archive “How to run a Flow starting from a particular task?”