Hi team, I have a question about structuring flow-...
# ask-community
g
Hi team, I have a question about structuring flow-of-flows. I have a linear ML pipeline with multiple steps, each of which has been written as a separate flow:
Copy code
flow_1 (preprocessing) -> flow_2 (training) -> flow_3 (deployment)
For convenience, I’ve created a flow-of-flows that runs all of these steps in sequence:
Copy code
# Flow-of-flows definition

flow_1 = StartFlowRun(flow_name="preprocessing", project_name="examples", wait=True)
flow_2 = StartFlowRun(flow_name="training", project_name="examples", wait=True)
flow_3 = StartFlowRun(flow_name="deployment", project_name="examples", wait=True)

with Flow("parent-flow") as flow:
    r1 = flow_1()
    r2 = flow_2(upstream_tasks=[r1])
    r3 = flow_3(upstream_tasks=[r2])
Now, let’s say that I have a new
flow_0
that checks an external data source for new data on some routine schedule. I want to have
flow_0
trigger the rest of the pipeline, but only when there is new data. I’m considering the following two solutions: SOLUTION 1 My first instinct was to add
flow_0
to the flow-of-flows, but I haven’t been able to find a clean way to use the result of
flow_0
to determine whether to run the rest of the pipeline. Digging further, it seems like passing data between flows in a flow-of-flows is not yet supported, but it’s possible that I’m not fully understanding the situation. SOLUTION 2 An alternative would be to append a
StartFlowRun(flow_name="parent-flow")
as the final task of
flow_0
and make it conditional on whether there is new data. I know this will work, but it feels less clean than having all of the flows inside a single
parent-flow
pipeline that runs on a schedule.
Does anyone know if S1 is feasible, or if there is some alternative to these two solutions that I’m missing? Thanks!
k
Hello @Gabe Grand, S1 is kinda feasible. You would persist the data collected somewhere. flow_1 would check for it and then raise the
SKIP
condition if there is nothing. You could also use
case
I think.
S2 works. Include flow_0 in the script with the 3 flows and run them as separate tasks so you have the output to decide whether to not to trigger flows1-3
g
Hi @Kevin Kho - thanks for the swift follow-up! For S1, is there any way to get
flow_0
to pass a parameter to
flow_1
(e.g., the S3 location of the data that needs to get preprocessed)?
k
I don’t think you can explicitly pass the same path (let’s say I upload to S3, get the filename that was created, and than pass it down). I would probably try to achieve the dynamic paths with templating: https://docs.prefect.io/core/concepts/templating.html#formatting-dates
It might be easier though if you prepend flow_0 inside flow_1
g
Right, I was also thinking it might make sense to merge flow_0 + flow_1 to avoid having to pass the S3 location across flows. That said, it still leaves the problem of how to skip the rest of the pipeline if there isn’t any new data?
k
You can use the
raise SKIP
under an if clause to check if there is data. : https://docs.prefect.io/core/concepts/states.html#skip-while-running
SKIP will skip downstream tasks and mark them as SUCCESS
There is an example snippet how to use it here: https://docs.prefect.io/core/getting_started/next-steps.html#signals
g
Ahh, this looks like exactly what I needed - thanks so much @Kevin Kho!
👍 1
j
Would it also be possible to wrap
prefect.client.client.Client.get_flow_run_state
in a task and use that to retrieve the result of
flow_0
so as to avoid having to modify
flow_0
to persist the data elsewhere? I can try it out this evening, but was wondering if someone knew offhand.
k
Hi @John Urbanik! This is certainly worth trying. You mean configuring the Result for flow_0 right? If it’s quick for you to try, I’d certainly appreciate it.
j
@Kevin Kho Just getting around to this now. Yes, I was thinking about situations where
flow_0
may be owned by another team so changing the Result class / using templating isn’t acceptable and you might not actually know the location it is stored in. I started playing around with it, and a few issues popped up: 1.
StartFlowRun
raises a signal on
wait=True
instead of returning a string as suggested in the documentation. I can get the
run_id
by doing a little string parsing, but it is messy. 2. I was wrong about the method, I need
get_flow_run_info
in order to actually get the state of the individual tasks. 3. This only gets the locations for given task slugs. You can Here is a minimal demo:
Copy code
import prefect
from prefect import Flow, task, Parameter
from prefect.tasks.prefect import StartFlowRun
from prefect.engine.results import PrefectResult

@task
def add(x, y):
    return x + y

@task
def get_results_from_flow_run_for_slugs(flow_run_signal, slugs):
    client = prefect.Client()
    flow_state = client.get_flow_run_info(flow_run_signal.state.message.split(' ')[0])
    flow_tasks = flow_state.task_runs
    params = {
        tr.task_slug: tr.state._result.read(tr.state._result.location)
        for tr in flow_tasks if tr.task_slug in slugs
    }
    return params

with Flow("flow_0", result=PrefectResult()) as flow:
    result = add(3, 5, task_args=dict(name="get_this"))

flow.register(project_name="ju-testing")

first_flow = StartFlowRun(flow_name="flow_0", project_name="ju-testing", wait=True)

with Flow("parent", result=PrefectResult()) as p_flow:
    flow_run_signal = first_flow()
    res = get_results_from_flow_run_for_slugs(flow_run_signal, slugs=[result.slug])

p_flow.register(project_name="ju-testing")
If there is interest, I can probably make a few PRs to make a task to this effect (I’d really rather not be relying on this string splitting though). I feel like there is probably a more elegant solution so I won’t be bothered if not!
k
Actually regarding the StartFlowRun, there is this Pull Request that may change things: https://github.com/PrefectHQ/prefect/pull/4245
👀 1
🙌 1
So with this, we'd be able to get flow information and then I guess get task result from it (a bit roughly)
j
Sweet, that looks like it’d be a lot cleaner + cut out a request.
k
Do you still feel the room for task to connect flows to upstream tasks with the slugs? Thanks for exploring this btw. It's very informative!
But I must say I think we are moving towards a direction where we want to support passing results between flows (just no timeline yet)
👍 2
j
I think in practice, I’d want to have the task return a dict of parameters that can be passed to the next flow in the sequence (i.e. pass a dict from slug of old flow to parameter name of new flow), but I didn’t code that up just to keep thing minimal. I could also probably subclass
StartFlowRun
so that it wraps this logic inside.
g
I second that @John Urbanik - it’d be really helpful to have
StartFlowRun(..., wait=True)
return a dict of per-task return values that can be passed to subsequent flows
👍 1
Hey @Kevin Kho, I took your suggestion and have reconfigured my pipeline to
raise SKIP
if there’s no new data. This works well for skipping the remaining tasks within
step_1
(I also took your suggestion to merge steps 0 + 1 to avoid having to pass data between them). However, within the flow-of-flows, when
step_1
raises SKIP, the subsequent
step_2
and
step_3
pipelines still get run. I’m assuming it’s because these are separate flows that get called with
StartFlowRun
. Is there any way to make these downstream flows respect the
SKIP
signal?
z
Hey @Gabe Grand -- just to clarify, you're raising a SKIP in a task in your first flow and you want your orchestrator flow (flow of flows) to skip flows two and three if this occurs?
g
yes exactly @Zanie!
k
Hey @Gabe Grand ! Talked to Michael about this. This is a bit more involved than expected because the
SKIP
is causing
step_1
to be labelled a success. We had a recent change that will help us propagate the
SKIP
, but I need to test it out. Let me work on an example for you.
g
Thanks @Kevin Kho and @Zanie for giving some thought to this. At face value, it’d be great if the final state of a
StartFlowRun(…, wait=True)
task could reflect the
SKIP
signal, though I’m obviously not sure how much complexity that will involve. If you have an example I can try out or a branch I can fetch, that’d be great.
z
The problem is that the SKIP signal is a task state and (generally) just because some tasks skipped does not mean the flow should also finish in that state.
We should be able to use the new flow "terminal state handler" to do this. Kevin's working on an example for you.
k
Hey @Gabe Grand, I may have to get back to you tom with a working example for this. I won’t forget this though!
g
@Zanie thanks for the context! Are there any cases where a flow will finish in SKIPPED? From the docs, I was a little unclear about the relationship between task and flow states.
@Kevin Kho sounds like a plan! Looking forward to seeing what this will look like.
z
Flows are basically Success/Fail based on the state of their terminal tasks. SKIPPED is treated as a Success state so that's the state the flow is assigned.
👍 1
This looks like it'll take some changes on our end to deliver so I don't think we'll be delivering this super soon. In the meantime, I'd recommend writing a signifying file from your first flow when you raise your SKIP then having a task in your orchestrator flow in a
case
statement check for the existence of the file.
g
Got it, thanks for the heads up. That’s certainly a doable workaround for this case, though it’s less optimal for some other use cases I’ve been thinking about. If you have time, it might be productive for us to set up a call to discuss broader design considerations for Prefect core. Feel free to DM me if this is something you’d be interested in.
In particular, the primary reason we’ve ended up using flow-of-flows (as opposed to one monolithic flow) is because different segments of the pipeline require different execution environments (in our case, preprocessing and ML training require different Docker images and hardware setups on K8s). However, in working with Prefect over the past few months, I’ve seen that there’s still a lot of active discussion within the community over the role of flow-of-flows and passing data between flows in the framework. I’d be happy to share my thoughts/experiences with you if it’d be helpful for determining development priorities.
1
j
@John Urbanik i came across this thread and was trying out the code you have shown above, however i hit an error that
("'Success' object has no attribute 'read'")
Any ideas on why?
z
@Gabe Grand we've got some ideas but I'll keep it in mind! I also have a background in work that would take a bunch of different execution environments and I feel like having a flow of K8s Jobs may be the way I'd go about solving it rather than a flow of flows (mostly because the support isn't there yet).
Note another solution would be to just query the API for the task run that you care about and check if it was skipped.
j
@Jeremy Tee whoops, copied over an incorrect version of the task. Fixed now.
🙌 1
z
It's worth pointing out that
it’d be really helpful to have 
StartFlowRun(..., wait=True)
 return a dict of per-task return values that can be passed to subsequent flows
is not directly feasible because many of our customers have flow runs with >10,000 tasks which we could not feasibly collect and return like that.
j
@Zanie Of course. I’d handle it somewhat like what I’ve done above; pass
StartFlowRun
a list of tasks/slugs that you want to return the results for. Internally maybe they’re mapped over with tasks so that you can parallelize the requests if it is still a relatively large number. Another option would be to add another form of
key task
. At my company, we’re overloading
reference_tasks
as a way to get the set of tasks to pass to the next flow in our sequence.
z
That makes sense 🙂 we've got some ideas for loading things lazily/on-demand as well.
Interestingly the
FlowRunner
has an idea of
return_tasks
that's basically unused when running flows with the API