Hello guys ! I'm wondering if it is possible to ru...
# prefect-community
a
Hello guys ! I'm wondering if it is possible to run a flow inside a flow! The main idea would be to have a flow (flowModel) that is responsible of training a model, and inside between the loading and the training part, run a flow for data preprocessing (flowPreproc). My main problem is that I'm giving a function task to my second flow instead of a value (see the example). I cannot think of a way to do it and I don't know if I lack skills with prefect or if this is bad practice. Thank you in advance ! 🙂 Keep doing the good work, your tool is amazing 🙂 Here is a minimal example:
Copy code
from prefect import task, Parameter, Flow
import numpy as np

@task
def augment_data(a, b=10):
    return np.sqrt(a)+np.sqrt(b)

with Flow("augment") as flowAug:
    a = Parameter("a")
    aug = augment_data(a, 10)

@task
def load_data(c):
    return c

@task
def train_model(model, d):
    print("Training, {} with {}!".format(model, d))
    return model


with Flow("training") as flowModel:
    init = Parameter("c")
    model = Parameter("model")
    data = load_data(init)
    state = flowAug.run(a=data)
    aug_data = state.result[aug].result
    result = train_model(model, aug_data)

state_model = flowModel.run(c=5, model="SVM")
I read a bit more the previous thread about nested flows and I realize that my case is a bit different. A bit like function in many language have a locals and globals, I would like to have the same in my flows. I'm trying something a bit crazy, but if there is an easy way I'm down for it 🙂
j
I’m not sure of an easy way for what you’re trying to accomplish but is there a reason you aren’t running your
augment
flow inside of a task? Something like:
Copy code
@task
def run_flowAug(data):
    return flowAug.run(a=data)
And then add that task to your
training
flow
d
Hey @alexandre kempf, I’m interested in your intention here. Are you trying to share code between different flows? As Josh pointed out, it would be easy to turn one of your flows into a task or series of dependent tasks here. In which case, if you’re trying to share functionality between flows, you could easily share the python code for the tasks as a package or file and then import that file into both flows.
Basically, is this an elegance or a functionality question?
a
@josh This is what I'm trying to do right now. I'm trying to figure out how to reuse the result of this task afterwards. I have two versions of code that are very similar, one works while the other crash and I'm currently investing it ! I might ask you some questions after I've done some debugging if you're ok with that 🙂
@Dylan You summarize the problem pretty well ! I want to have subflows in order to reuse them. The perfect example is data augmentation. For a given computer science task, when I'm training a model on classification for images cat/dogs for instance, I will use a prefect flow to modify my dataset just before training. It should be a subflow because I want to run it on every epoch (I cannot do it once at the beginning on my training and then launch another flow). Another reason why I want it a subflow is that next week if I'm working on a house/bridge classifier, I want to be able to reuse the exact same augmentation and thus just run this subflow again without coding it or copy pasting it. So that are the motivations 🙂 Now in practice created a parser to transform YAML into flows automatically. the DAG is specify in the YAML and it works like a charm. I'm struggling to treat another YAML file as a task (should not be hard in practise as said josh). However I think I'm getting confused with the result of the subflow example: if I do
Copy code
@task
def sumsqrt(a, b):
    return np.sqrt(a)+np.sqrt(b)
(note that I'm using sqrt in order to avoir the add operation from prefect that can hide a bug if a and b are task and not float or int) The values I return is not a task in my function definition, but in the flow it is. Therefor I tried something like
Copy code
@task
def run_flowAug(data):
    state = flowAug.run(a=data)
    return state.result["aug"].result
so I return something that is not a task. And in the main flow it is NOT considered a task. As you see I'm a bit lost 😛 But I don't want to take too much of your time, so I'll try to figure out what is wrong on my side and then ask more specific questions 🙂
I think I spotted the weirdest bug ever ^^" The subflow works when I pass to run_flowAug a second argument that is a dict (for personnal reason). But it doesn't work is one of the value of the dict is a task (knowing that this "task" in the dict is not even used in the flows) :s. If one of you guys have 10 min, I can explain it to you on discord + screensharing. The code is 100lines long (nothing fancy).
I created a new post about it, the issue is bigger than just subflow to my opinion 🙂