Hello, I would like to know what’s the best way to...
# prefect-community
h
Hello, I would like to know what’s the best way to organize the code for setting up a development workflow for a prefect flow. I would like to do the following in my workflow 1. define/update a flow 2. test run my flow (built in a docker storage) with a local executor 3. When step 2 passes, I could register the same flow (built in a docker storage) to prefect cloud Is there any best practice or code example? We currently also use another docker container to set up the local-test/build environment. is there a better approach?
j
Hi Hui, can you be more specific? Are you looking for how to develop a flow locally in such a workflow? Automate the testing/registration of a flow in CI? ?
h
I think it’s for both. I wonder what’s the best way to organize the code of flow definition along the code of flow execution (local-test, local-run, and register-to-prefect-clound)
Currently, I have a python file named
my_flow.py
which contains only the task and flow definition.
Copy code
@task
def task_1()
@task
def task_2()
with Flow("my_flow") as flow:
...
and then I have another file named
local-test.py
Copy code
from my_flow import flow

with raise_on_exception():
    fetch_results = my_flow.run()
...
and I have one more file named
build_and_register.py
Copy code
from my_flow import flow
flow.schedule = Schedule(...)
flow.storage = Docker(...)
if push_to_cloud:
  flow.register(...)
else:
  # build for local test
  flow.storage.build(push=False)
j
We don't currently have a guide for this (other users in this slack might have some good tips). My shortlist is: • When possible, I recommend making your flow docker-agnostic. This makes it easier to test and develop your flow without requiring images to be built or a local docker setup running. • Likewise, if it makes sense I recommend using a non-
Docker
storage class, and specifying a fixed docker image as part of the flow's
run_config
. This makes it easier to separate the deployment environment from the flow code itself (since the flow code is stored external to the image). • I recommend defining and registering a flow in the same file, something like:
Copy code
with Flow(...) as flow:
    ...

if __name__ == "__main__":
    flow.register(...)
if the flow is imported (say in your tests or as part of other code) it won't be registered, but running the flow file as a script will still register it. Since your flow's module is then
__main__
when registered, this means that cloudpickle will successfully serialize your flow when using pickle-based storage. If you're storing as a python file (
stored_as_script=True
) then this doesn't matter.
The last point may be a bit confusing. By default Prefect serializes a flow using
cloudpickle
, which fully serializes all function definitions written in the
__main__
module. Functions written in other modules (say imported from a different file) are only serialized by reference (e.g.
mymodule.myfunction
) meaning that the source for those functions must be already available in the deserialization environment. In prefect's case, this usually comes up when dealing with user-written tasks. If you write all your tasks in the same file you call
flow.register
in, then run that as a script all your task definitions will be stored in the
Storage
class. If you write them in other files, those files will need to be installed in your execution environment (e.g. a docker image) for things to work properly. However, if you set
stored_as_script=True
on your
Storage
class, the flow will be loaded from a
.py
file again on execution, which avoids this issue (but for most storage classes puts the burden of getting the
.py
file to that location on you). See https://docs.prefect.io/core/idioms/file-based.html for more info.
h
yes, that’s right. thank you for the last point
I think we ran into some these strange issues
Because our flow tasks definitions are written in a
my_flow.py
file, which is a different file from what we run
register()
, the
cloudpickle
couldn’t get some class object type global variables such as
local_tz = pytz.timezone(tz)
in
my_flow.py
j
Yeah, that sounds like something caused by the flow being defined in a separate file.