a

    alexandre kempf

    2 years ago
    Hello guys ! I'm wondering if it is possible to run a flow inside a flow! The main idea would be to have a flow (flowModel) that is responsible of training a model, and inside between the loading and the training part, run a flow for data preprocessing (flowPreproc). My main problem is that I'm giving a function task to my second flow instead of a value (see the example). I cannot think of a way to do it and I don't know if I lack skills with prefect or if this is bad practice. Thank you in advance ! ๐Ÿ™‚ Keep doing the good work, your tool is amazing ๐Ÿ™‚ Here is a minimal example:
    from prefect import task, Parameter, Flow
    import numpy as np
    
    @task
    def augment_data(a, b=10):
        return np.sqrt(a)+np.sqrt(b)
    
    with Flow("augment") as flowAug:
        a = Parameter("a")
        aug = augment_data(a, 10)
    
    @task
    def load_data(c):
        return c
    
    @task
    def train_model(model, d):
        print("Training, {} with {}!".format(model, d))
        return model
    
    
    with Flow("training") as flowModel:
        init = Parameter("c")
        model = Parameter("model")
        data = load_data(init)
        state = flowAug.run(a=data)
        aug_data = state.result[aug].result
        result = train_model(model, aug_data)
    
    state_model = flowModel.run(c=5, model="SVM")
    I read a bit more the previous thread about nested flows and I realize that my case is a bit different. A bit like function in many language have a locals and globals, I would like to have the same in my flows. I'm trying something a bit crazy, but if there is an easy way I'm down for it ๐Ÿ™‚
    j

    josh

    2 years ago
    Iโ€™m not sure of an easy way for what youโ€™re trying to accomplish but is there a reason you arenโ€™t running your
    augment
    flow inside of a task? Something like:
    @task
    def run_flowAug(data):
        return flowAug.run(a=data)
    And then add that task to your
    training
    flow
    Dylan

    Dylan

    2 years ago
    Hey @alexandre kempf, Iโ€™m interested in your intention here. Are you trying to share code between different flows? As Josh pointed out, it would be easy to turn one of your flows into a task or series of dependent tasks here. In which case, if youโ€™re trying to share functionality between flows, you could easily share the python code for the tasks as a package or file and then import that file into both flows.
    Basically, is this an elegance or a functionality question?
    a

    alexandre kempf

    2 years ago
    @josh This is what I'm trying to do right now. I'm trying to figure out how to reuse the result of this task afterwards. I have two versions of code that are very similar, one works while the other crash and I'm currently investing it ! I might ask you some questions after I've done some debugging if you're ok with that ๐Ÿ™‚
    @Dylan You summarize the problem pretty well ! I want to have subflows in order to reuse them. The perfect example is data augmentation. For a given computer science task, when I'm training a model on classification for images cat/dogs for instance, I will use a prefect flow to modify my dataset just before training. It should be a subflow because I want to run it on every epoch (I cannot do it once at the beginning on my training and then launch another flow). Another reason why I want it a subflow is that next week if I'm working on a house/bridge classifier, I want to be able to reuse the exact same augmentation and thus just run this subflow again without coding it or copy pasting it. So that are the motivations ๐Ÿ™‚ Now in practice created a parser to transform YAML into flows automatically. the DAG is specify in the YAML and it works like a charm. I'm struggling to treat another YAML file as a task (should not be hard in practise as said josh). However I think I'm getting confused with the result of the subflow example: if I do
    @task
    def sumsqrt(a, b):
        return np.sqrt(a)+np.sqrt(b)
    (note that I'm using sqrt in order to avoir the add operation from prefect that can hide a bug if a and b are task and not float or int) The values I return is not a task in my function definition, but in the flow it is. Therefor I tried something like
    @task
    def run_flowAug(data):
        state = flowAug.run(a=data)
        return state.result["aug"].result
    so I return something that is not a task. And in the main flow it is NOT considered a task. As you see I'm a bit lost ๐Ÿ˜› But I don't want to take too much of your time, so I'll try to figure out what is wrong on my side and then ask more specific questions ๐Ÿ™‚
    I think I spotted the weirdest bug ever ^^" The subflow works when I pass to run_flowAug a second argument that is a dict (for personnal reason). But it doesn't work is one of the value of the dict is a task (knowing that this "task" in the dict is not even used in the flows) ๐Ÿ˜’. If one of you guys have 10 min, I can explain it to you on discord + screensharing. The code is 100lines long (nothing fancy).
    I created a new post about it, the issue is bigger than just subflow to my opinion ๐Ÿ™‚