https://prefect.io logo
Title
a

alexandre kempf

01/13/2020, 3:53 PM
Hello guys 🙂 Once again I need your (wonderful) customer service, but this time more for an advice. I would like to have fully reproducible flows once they are saved on my disk. Right now I have the feeling that if a function A in my flow has a behavior A and then I save the flow. If I change function A to have a behavior B, that I reload the flow saved and that I run it I will have the behavior B. Question 1: Is there an easy way to replay exactly the flow as it was when I saved it ? (with behavior A even after I changed the function A) Question 2: If no, do you recommend any tool do you it ? I feel like forcing the user to commit and saving the commit id is a bit too much for the user I'm targeting. Thanks in advance 🙂
j

josh

01/13/2020, 3:59 PM
@alexandre kempf Are you using the same name for your Flow for each iteration? e.g. does the flow which uses A have the same name as the flow which uses B? Second question: are you saving with
flow.save
or are you using the Cloud-based
flow.register
with a Local Storage option?
a

alexandre kempf

01/13/2020, 4:25 PM
@josh Thanks for your interest 🙂 I use the same name because I use flow.save and flow.load so I use the exact same flow. It is just that between the 'save' and the 'load', I change the behavior of one task
and I'm using the flow.save right now (not yet on the cloud)
j

josh

01/13/2020, 4:27 PM
When you call
flow.save
the flow is serialized using cloudpickle and stored somewhere on your computer (you may optionally provide a file path 🙂). Then if you change the behavior of one of your tasks you should call
flow.save
again to update the stored flow
Calling
flow.save
with the same flow name will overwrite the old save. If you change the name then you can have multiple flows and load them up as you please
a

alexandre kempf

01/13/2020, 4:28 PM
I think I'm not clear in the explanation of my problem, let me rephrase it.
👍 1
I would like to: • save a flow with a function A that return 'Hello World' with flow.save("flowA".flow) locally • modify my function A to return "Hello Josh" instead and save it • shut down my computer • reload flow with flow.load("flowA.flow").run() • get "Hello World" because I saved my flow when the function was returning "Hello World" Another way to say it is to ask if it is possible to save task's code inside the .flow As I'm working on reproducibility, I want to be able to rerun a flow EXACTLY how I runned it 2 years ago when my function were very different 🙂
tell me if there is any questions concerning my explanation 🙂
j

josh

01/13/2020, 4:41 PM
That functionality should already work! Is this what you’re expecting?
In [1]: from prefect import Flow, task

In [2]: @task
   ...: def hello():
   ...:     print("Hello World")
   ...:

In [3]: with Flow("flowA") as flow:
   ...:     hello()
   ...:

In [4]: flow.run()
# PRINTS "Hello World"

In [5]: flow.save()
Out[5]: '/Users/josh/.prefect/flows/flowa.prefect'

In [6]: @task
   ...: def hello():
   ...:     print("Hello Alexandre")
   ...:

In [7]: with Flow("flowA") as flow:
   ...:     hello()
   ...:

In [8]: flow.run()
# PRINTS "Hello Alexandre"

In [9]: flow.save()
Out[9]: '/Users/josh/.prefect/flows/flowa.prefect'
# Overwrites the original

In [10]: new_flow = flow.load(_)

In [11]: new_flow.run()
# PRINTS "Hello Alexandre"
When you say “modify my function A to return “Hello Josh” instead and save it” then the saved flow whenever loaded will print “Hello Josh” and the original “Hello World” was overwritten since they both had the same Flow name
If you want the “Hello World” output that you saved originally then you have to change the name of the Flow (to something like
Flow B
) before you save with “Hello Josh” so both options will be stored to disk (both
Flow A
and
Flow B
). Currently the Flow’s name is the only versioning semantic for locally stored flows
a

alexandre kempf

01/13/2020, 4:50 PM
No I'm not saving the flow with Hello Josh, I'm just saving the python file that define the function (I imported it as a module).
In [1]: from prefect import Flow, task

In [2]: @task
   ...: def hello():
   ...:     print("Hello World")
   ...:

In [3]: with Flow("flowA") as flow:
   ...:     hello()
   ...:

In [4]: flow.run()
# PRINTS "Hello World"

In [5]: flow.save()
Out[5]: '/Users/josh/.prefect/flows/flowa.prefect'

In [6]: @task
   ...: def hello():
   ...:     print("Hello Alexandre")
   ...:

In [7]: flo = Flow.load('flowa.prefect')

In [8]: flo.run()
# PRINTS "Hello World"
and I would like a Hello World here because when I saved the flow, my function hello returned Hello World 🙂
wait two seconds @josh before replying, because the example I'm sending is not working the way my code works ^^"
j

josh

01/13/2020, 4:56 PM
👀
a

alexandre kempf

01/13/2020, 5:09 PM
I swear my brain is super confused !
I don't know what I'm doing wrong, because except the part where I'm building the flow, which is not trivial, the save flow-modify function-load flow-run is super trivial :s
Is it possible that the behavior change if the function is part of a module that I import at the beginning of the script where I define the flow ?
Yep that is the problem it seems @josh. So prefect has the perfect behavior (to me) when the function is define in the same script as the flow. However, it is not the case when the function is loaded from a package (in my case with a task(getattr(module, "function")) method).
I just found
your flow object is serialized with cloudpickle , which is then sent to your dask cluster. Therefore code in your module mytask is not serialized in the flow, it is stored as a reference. If you make sure that mytask  is in your PYTHONPATH  in each of your dask workers, defining tasks in modules should work.
from @emre on another thread. Is that the reason ? Can we force the serialization of the module inside the flow or at least the one of a few functions ?
j

josh

01/13/2020, 5:41 PM
I don’t recall the exact reason why that is the case but it’s a core functionality piece of pickling. It doesn’t serialize imports cc @Chris White
c

Chris White

01/13/2020, 5:46 PM
Yea, this behavior is built-in to core Python so can’t really be changed. To manage your Flows at this level of granularity, I’d recommend using Docker which will preserve all of your imports as well
a

alexandre kempf

01/13/2020, 5:54 PM
@Chris White what do you mean ? If I have a 2 scripts in a docker (1 to define my tasks, one to define my flow) the script with the tasks will be serialize also in the flow ? Or the flow launch the docker ?
c

Chris White

01/13/2020, 7:05 PM
In essence you are seeking to version your runtime environment for your Flow which includes things like how imports behave; you can either do this through virtual environments or by placing your Flow and all its dependencies inside a Docker container, and docker containers are much easier to version