Following up on subflows. I was wondering if this ...
# prefect-community
b
Following up on subflows. I was wondering if this is still an active area of development---I know I asked about this before (https://prefect-community.slack.com/archives/CL09KU1K7/p1586212582026500), but it looks like there hasn’t been activity on the linked github issue in a little while. It would be very valuable for us to have subflows as first class objects. One major use case is that we’d like to have a single production version of a flow, and then we’d like for other code to be able to modify that flow (e.g. by swapping out a large subgraph for another subgraph that has the same keyed upstream and downstream edges). It seems to me that the most natural way to achieve this is to have nested subgraphs of tasks within a flow. I’ve toyed around with an implementation that I think would work, and would love to know if there is any appetite from all y’all at
prefect
for something like this. The idea is that currently
Flow
objects have two major responsibilities: (1) building/maintaining a DAG and (2) supporting execution of that DAG. I propose splitting that functionality into two classes:
Flow
, which would support DAG execution, and
Composite
(or some other name), which builds and maintains a DAG. Crucially,
Composite
instances could be nested (i.e.
Composite.add_task
would accept either a
Task
or another
Composite
instance, and similarly for
replace
,
add_edge
, etc). To run the DAG,
Flow
would traverse the nested graph and build up a flattened DAG that just contains
Task
nodes, and then execute as usual.
n
Hi @Ben Fogelson - thanks for carrying this issue forward! Given that your proposal would pretty fundamentally change a major fixture in Core, I think it's most appropriate to move this conversation to a GitHub discussion or to the issue where subflows are being tracked. To respond more acutely, I don't think we've seen a use case yet that wouldn't be solved by more effectively modularizing code. That said, that doesn't mean the use case doesn't exist, so I'd encourage you to contribute your use cases to that thread. 😄
j
@Ben Fogelson your description is actually very similar to how Prefect works today, with different names:
Flows
are used to build the DAG; and
FlowRunners
are used to execute it.
b
Hi @Jeremiah, fair point. Would a better way to say it be that
Flows
build the DAG and also have information about how to build and register it so it is ready to be run? E.g.
environment
,
storage
,
register()
j
Indeed, Flows represent the physical structure and description, and FlowRunners use that information to execute it
b
Thanks @nicholas! Do the docs have any examples of effective modularization patterns that have successfully solved these types of issues? I haven’t seen any but might not be looking in the right place
j
However, (despite trying!) we have been unable to motivate a use case that necessitates nested flows (subflows). All use cases we’ve seen so far are solved by using factory functions and other Python convenience classes - it might be possible for you to implement your
Composite
class completely independent of Prefect, as a way of generating your final flow.
b
Yeah I was starting to lean that way
j
I think the tough thing is we completely recognize why subflows are attractive, but we’ve struggled to come up with an API for them that doesn’t just devolve into factory functions, because at the end of the day they do get flattened
One thing we are working on is better ways to group / visualize tasks (through
tags
) because a major part of this is the mental burden of keeping trac of groups of tasks - look for that soon
b
The use case I’m really struggling with is how to modify an existing
Flow
in a modular fashion. I totally get how factory functions are fantastic for modularization in the
Flow
construction process, but for modification something like a nested structure still feels useful. Would love to hear thoughts on other ways to do it though
I think maybe your suggestion of having
Composite
be separate could work
It is essentially keeping a nested DAG structure on its own and then flattening it to feed it into a
Flow
i
Hi @Ben Fogelson - Here is the current pattern I use to flatten out flows. 1. Write each flow independently 2.
flow_update(merge_parameters=True
3. Run a
remove_middle_parameters
script Steps 2 and 3 are combined in
build_flow.py
See gist for more details... https://gist.github.com/gryBox/da862d89ad0df49c02cc9b86e334b23c Let me know if you need help understanding it. Can certainly be improved. Currently using this on 4 flows together.
j
@Ben Fogelson we usually don’t see modifications of flow objects that have already been built (at least, not destructive actions that remove nodes/edges - in fact the
Flow
object doesn’t really contemplate this possibility). I think this would be an appropriate place to build your own “Flow Factory” (whether a function, a class, a DSL etc.) that allows you to put all the building blocks together however you want to produce the “final” flow you want. Sort of one step higher than the Prefect API
i
lol @Jeremiah I take it your not a fan of the process presented.
j
Haha @itay livni more philosophical — I think of flows as pseudo-immutable (or append only)
m
with my (limited) experience so far I've found that flow calling serves most of my subflow-like problems. If all of your potential plug-in flows has a relatively fixed set of parameters that you discover through the outer flow, then it's simple enough to make the
project_name
and
flow_name
of the 'plug-in' flow parameters on the outer flow.
This also points to a relatively elegant solution to dynamic flows of flow recursion - if you have a dynamic DAG-like structure you want to operate on, then recursively calling your own flow with a child node allows you to break it into subflows.
i
@Matt Wong-Kemp Do you have a toy example you can share?
m
let me see if I can knock one up.
🚀 1
So I said I'd have a go at putting an example together of using
FlowRunTask(wait=True)
for subflow-running - here's a recursive flow that calls itself to calculate the fibonacci series value for a given `n`: https://gist.github.com/emcake/032cef5147c468f0074c2cfe7d1703c4 The first caveat is this is obviously slightly mad. I'm also spotting something slightly weird behaviour where I can't get it to recurse properly on a local agent due to some weird parallelism thing, but I've definitely seen this work on previous versions. The second caveat is that it doesn't calculate the fibonacci series! @Chris White your recent change to use an idempotency key in
FlowRunTask
of the run id means that, rather than calling the flow again for
n-1
and
n-2
, it calls it for whichever one wins and then the loser just gets the winner's value. Is there any chance we can make the idempotency key a parameter into the
FlowRunTask
that uses the run id as the default? This way I could come up with a context-specific run id, like
fib-{n}
that'd even get me memoizing fibonacci runs. :)
🤯 2
c
This is cool to see! I could fix your exact call pattern by including the map index in the idempotency key; I’m a little nervous that people will not understand its usage if it’s exposed (especially in extreme edge cases where the process dies and is retried by the zombie killer service - it’s important to understand the idempotency key in that situation) Would including the map index work for your use case or do you still want full control of the key?
m
For the 'real' use case I want to use this for I need the key, unfortunately.
👍 1
j
Wow this is so cool
m
however the deeper learning here is that flows are just functions/tasks/members of the same functor class as tasks, they take one input and (with some abuse) produce one output
upvote 2
c
https://github.com/PrefectHQ/prefect/issues/2997 i should be able to get to this in the AM 👍
j
👌
the deeper learning here is that flows are just functions
This is a really insightful point @Matt Wong-Kemp
m
one thing that might make this a little nicer is the idea of a flow returning a result, like taking all reference tasks and tupling their results
j
We’ve discussed that many times but never quite gotten it prioritized (previously, there wasn’t really a way to take advantage of it programmatically)
but yes the
Success
state of a flow could just reflect its reference tasks
m
the deeper learning here is that flows are just functions
🚨 nerd alert 🚨 here but technically flows are members of the same selective-applicative-functor typeclass as tasks, since they also carry state and all of the flow control options that tasks do :)
marvin 1
🤓 2
c
blog post idea: deconstruct the Prefect ecosystem with Category Theory / Type Theory
i
@Matt Wong-Kemp Wow! Thank you for sharing
m
for completeness, here is plugin flows, where you use some dynamic value to choose which inner flow to run: https://gist.github.com/emcake/bf53d8484fbb33c7b14bd1940eb9f39c if all of your possible subflows confirm to some simple interface (take in parameter
x
, output result in task with slug
result
) then you can treat the inner part as a black box you don't really care about.