Hui Zheng
01/05/2021, 9:08 PMJim Crist-Harif
01/05/2021, 9:11 PMHui Zheng
01/05/2021, 9:13 PMmy_flow.py
which contains only the task and flow definition.
@task
def task_1()
@task
def task_2()
with Flow("my_flow") as flow:
...
local-test.py
from my_flow import flow
with raise_on_exception():
fetch_results = my_flow.run()
...
build_and_register.py
from my_flow import flow
flow.schedule = Schedule(...)
flow.storage = Docker(...)
if push_to_cloud:
flow.register(...)
else:
# build for local test
flow.storage.build(push=False)
Jim Crist-Harif
01/05/2021, 9:22 PMDocker
storage class, and specifying a fixed docker image as part of the flow's run_config
. This makes it easier to separate the deployment environment from the flow code itself (since the flow code is stored external to the image).
• I recommend defining and registering a flow in the same file, something like:
with Flow(...) as flow:
...
if __name__ == "__main__":
flow.register(...)
if the flow is imported (say in your tests or as part of other code) it won't be registered, but running the flow file as a script will still register it. Since your flow's module is then __main__
when registered, this means that cloudpickle will successfully serialize your flow when using pickle-based storage. If you're storing as a python file (stored_as_script=True
) then this doesn't matter.cloudpickle
, which fully serializes all function definitions written in the __main__
module. Functions written in other modules (say imported from a different file) are only serialized by reference (e.g. mymodule.myfunction
) meaning that the source for those functions must be already available in the deserialization environment.
In prefect's case, this usually comes up when dealing with user-written tasks. If you write all your tasks in the same file you call flow.register
in, then run that as a script all your task definitions will be stored in the Storage
class. If you write them in other files, those files will need to be installed in your execution environment (e.g. a docker image) for things to work properly.
However, if you set stored_as_script=True
on your Storage
class, the flow will be loaded from a .py
file again on execution, which avoids this issue (but for most storage classes puts the burden of getting the .py
file to that location on you). See https://docs.prefect.io/core/idioms/file-based.html for more info.Hui Zheng
01/05/2021, 9:30 PMmy_flow.py
file, which is a different file from what we run register()
, the cloudpickle
couldn’t get some class object type global variables such as local_tz = pytz.timezone(tz)
in my_flow.py
Jim Crist-Harif
01/05/2021, 9:59 PM