a

    alexandre kempf

    2 years ago
    Hello guys 🙂 Once again I need your (wonderful) customer service, but this time more for an advice. I would like to have fully reproducible flows once they are saved on my disk. Right now I have the feeling that if a function A in my flow has a behavior A and then I save the flow. If I change function A to have a behavior B, that I reload the flow saved and that I run it I will have the behavior B. Question 1: Is there an easy way to replay exactly the flow as it was when I saved it ? (with behavior A even after I changed the function A) Question 2: If no, do you recommend any tool do you it ? I feel like forcing the user to commit and saving the commit id is a bit too much for the user I'm targeting. Thanks in advance 🙂
    j

    josh

    2 years ago
    @alexandre kempf Are you using the same name for your Flow for each iteration? e.g. does the flow which uses A have the same name as the flow which uses B? Second question: are you saving with
    flow.save
    or are you using the Cloud-based
    flow.register
    with a Local Storage option?
    a

    alexandre kempf

    2 years ago
    @josh Thanks for your interest 🙂 I use the same name because I use flow.save and flow.load so I use the exact same flow. It is just that between the 'save' and the 'load', I change the behavior of one task
    and I'm using the flow.save right now (not yet on the cloud)
    j

    josh

    2 years ago
    When you call
    flow.save
    the flow is serialized using cloudpickle and stored somewhere on your computer (you may optionally provide a file path 🙂). Then if you change the behavior of one of your tasks you should call
    flow.save
    again to update the stored flow
    Calling
    flow.save
    with the same flow name will overwrite the old save. If you change the name then you can have multiple flows and load them up as you please
    a

    alexandre kempf

    2 years ago
    I think I'm not clear in the explanation of my problem, let me rephrase it.
    I would like to: • save a flow with a function A that return 'Hello World' with flow.save("flowA".flow) locally • modify my function A to return "Hello Josh" instead and save it • shut down my computer • reload flow with flow.load("flowA.flow").run() • get "Hello World" because I saved my flow when the function was returning "Hello World" Another way to say it is to ask if it is possible to save task's code inside the .flow As I'm working on reproducibility, I want to be able to rerun a flow EXACTLY how I runned it 2 years ago when my function were very different 🙂
    tell me if there is any questions concerning my explanation 🙂
    j

    josh

    2 years ago
    That functionality should already work! Is this what you’re expecting?
    In [1]: from prefect import Flow, task
    
    In [2]: @task
       ...: def hello():
       ...:     print("Hello World")
       ...:
    
    In [3]: with Flow("flowA") as flow:
       ...:     hello()
       ...:
    
    In [4]: flow.run()
    # PRINTS "Hello World"
    
    In [5]: flow.save()
    Out[5]: '/Users/josh/.prefect/flows/flowa.prefect'
    
    In [6]: @task
       ...: def hello():
       ...:     print("Hello Alexandre")
       ...:
    
    In [7]: with Flow("flowA") as flow:
       ...:     hello()
       ...:
    
    In [8]: flow.run()
    # PRINTS "Hello Alexandre"
    
    In [9]: flow.save()
    Out[9]: '/Users/josh/.prefect/flows/flowa.prefect'
    # Overwrites the original
    
    In [10]: new_flow = flow.load(_)
    
    In [11]: new_flow.run()
    # PRINTS "Hello Alexandre"
    When you say “modify my function A to return “Hello Josh” instead and save it” then the saved flow whenever loaded will print “Hello Josh” and the original “Hello World” was overwritten since they both had the same Flow name
    If you want the “Hello World” output that you saved originally then you have to change the name of the Flow (to something like
    Flow B
    ) before you save with “Hello Josh” so both options will be stored to disk (both
    Flow A
    and
    Flow B
    ). Currently the Flow’s name is the only versioning semantic for locally stored flows
    a

    alexandre kempf

    2 years ago
    No I'm not saving the flow with Hello Josh, I'm just saving the python file that define the function (I imported it as a module).
    In [1]: from prefect import Flow, task
    
    In [2]: @task
       ...: def hello():
       ...:     print("Hello World")
       ...:
    
    In [3]: with Flow("flowA") as flow:
       ...:     hello()
       ...:
    
    In [4]: flow.run()
    # PRINTS "Hello World"
    
    In [5]: flow.save()
    Out[5]: '/Users/josh/.prefect/flows/flowa.prefect'
    
    In [6]: @task
       ...: def hello():
       ...:     print("Hello Alexandre")
       ...:
    
    In [7]: flo = Flow.load('flowa.prefect')
    
    In [8]: flo.run()
    # PRINTS "Hello World"
    and I would like a Hello World here because when I saved the flow, my function hello returned Hello World 🙂
    wait two seconds @josh before replying, because the example I'm sending is not working the way my code works ^^"
    j

    josh

    2 years ago
    👀
    a

    alexandre kempf

    2 years ago
    I swear my brain is super confused !
    I don't know what I'm doing wrong, because except the part where I'm building the flow, which is not trivial, the save flow-modify function-load flow-run is super trivial 😒
    Is it possible that the behavior change if the function is part of a module that I import at the beginning of the script where I define the flow ?
    Yep that is the problem it seems @josh. So prefect has the perfect behavior (to me) when the function is define in the same script as the flow. However, it is not the case when the function is loaded from a package (in my case with a task(getattr(module, "function")) method).
    I just found
    your flow object is serialized with cloudpickle , which is then sent to your dask cluster. Therefore code in your module mytask is not serialized in the flow, it is stored as a reference. If you make sure that mytask  is in your PYTHONPATH  in each of your dask workers, defining tasks in modules should work.
    from @emre on another thread. Is that the reason ? Can we force the serialization of the module inside the flow or at least the one of a few functions ?
    j

    josh

    2 years ago
    I don’t recall the exact reason why that is the case but it’s a core functionality piece of pickling. It doesn’t serialize imports cc @Chris White
    Chris White

    Chris White

    2 years ago
    Yea, this behavior is built-in to core Python so can’t really be changed. To manage your Flows at this level of granularity, I’d recommend using Docker which will preserve all of your imports as well
    a

    alexandre kempf

    2 years ago
    @Chris White what do you mean ? If I have a 2 scripts in a docker (1 to define my tasks, one to define my flow) the script with the tasks will be serialize also in the flow ? Or the flow launch the docker ?
    Chris White

    Chris White

    2 years ago
    In essence you are seeking to version your runtime environment for your Flow which includes things like how imports behave; you can either do this through virtual environments or by placing your Flow and all its dependencies inside a Docker container, and docker containers are much easier to version