alexandre kempf
01/13/2020, 3:53 PMjosh
01/13/2020, 3:59 PMflow.save
or are you using the Cloud-based flow.register
with a Local Storage option?alexandre kempf
01/13/2020, 4:25 PMjosh
01/13/2020, 4:27 PMflow.save
the flow is serialized using cloudpickle and stored somewhere on your computer (you may optionally provide a file path 🙂). Then if you change the behavior of one of your tasks you should call flow.save
again to update the stored flowflow.save
with the same flow name will overwrite the old save. If you change the name then you can have multiple flows and load them up as you pleasealexandre kempf
01/13/2020, 4:28 PMjosh
01/13/2020, 4:41 PMIn [1]: from prefect import Flow, task
In [2]: @task
...: def hello():
...: print("Hello World")
...:
In [3]: with Flow("flowA") as flow:
...: hello()
...:
In [4]: flow.run()
# PRINTS "Hello World"
In [5]: flow.save()
Out[5]: '/Users/josh/.prefect/flows/flowa.prefect'
In [6]: @task
...: def hello():
...: print("Hello Alexandre")
...:
In [7]: with Flow("flowA") as flow:
...: hello()
...:
In [8]: flow.run()
# PRINTS "Hello Alexandre"
In [9]: flow.save()
Out[9]: '/Users/josh/.prefect/flows/flowa.prefect'
# Overwrites the original
In [10]: new_flow = flow.load(_)
In [11]: new_flow.run()
# PRINTS "Hello Alexandre"
Flow B
) before you save with “Hello Josh” so both options will be stored to disk (both Flow A
and Flow B
). Currently the Flow’s name is the only versioning semantic for locally stored flowsalexandre kempf
01/13/2020, 4:50 PMIn [1]: from prefect import Flow, task
In [2]: @task
...: def hello():
...: print("Hello World")
...:
In [3]: with Flow("flowA") as flow:
...: hello()
...:
In [4]: flow.run()
# PRINTS "Hello World"
In [5]: flow.save()
Out[5]: '/Users/josh/.prefect/flows/flowa.prefect'
In [6]: @task
...: def hello():
...: print("Hello Alexandre")
...:
In [7]: flo = Flow.load('flowa.prefect')
In [8]: flo.run()
# PRINTS "Hello World"
josh
01/13/2020, 4:56 PMalexandre kempf
01/13/2020, 5:09 PMyour flow object is serialized with cloudpickle , which is then sent to your dask cluster. Therefore code in your module mytask is not serialized in the flow, it is stored as a reference. If you make sure that mytask is in your PYTHONPATH in each of your dask workers, defining tasks in modules should work.
from @emre on another thread. Is that the reason ? Can we force the serialization of the module inside the flow or at least the one of a few functions ?josh
01/13/2020, 5:41 PMChris White
01/13/2020, 5:46 PMalexandre kempf
01/13/2020, 5:54 PMChris White
01/13/2020, 7:05 PM