Hi folks, does anyone have any proof-of-concept ex...
# prefect-community
g
Hi folks, does anyone have any proof-of-concept examples of a project where there are multiple flows within the same project directory and some shared logic which the flows can import? I can get importing to work when running the flow locally with 
flow.run()
, but registering the flow on Prefect Server and triggering the flow from there fails because of module errors. I guess I'm just misunderstanding python's importing logic (I'm by no means an expert...) so a basic working example that I can build on would really help me get off the ground.
An example of what I mean, using the following structure:
Copy code
my_project/
├── README.txt
├── __init__.py
├── requirements.txt
├── setup.py
├── etl
│   ├── __init__.py
│   ├── foo
│   │   ├── __init__.py
│   │   ├── foo_flow.py
│   │   ├── config
│   │   │   ├── __init__.py
│   │   │   └── foo_config.py
│   ├── shared
│   │   ├── __init__.py
│   │   └── sftp_utils.py
│   └── bar
│       ├── __init__.py
│       ├── bar_flow.py
│       ├── config
│       │   ├── bar_config.py
│       └── test
│           ├── __init__.py
│           └── test_bar_flow.py
├── .venv
├── start_agent.bat
 I want to have two flows, 
foo_flow.py
 and 
bar_flow.py
, each of which use some common logic imported from 
sftp_utils.py
, and have one agent launched from 
start_agent.bat
 which can trigger both of the flows. My main pain points are: • what to put into 
setup.py
 so that running 
pip install -e .
 in the root dir allows scripts to import from each other • what importing syntax to use in (e.g.) 
foo_flow.py
 to get logic from 
sftp_utils.py
Currently when I register the flow with Prefect Server, then start the local agent with 
--show-flow-logs
 , the CLI spams error messages, starting with 
ModuleNotFoundError: No module named 'setuptools._distutils'
 then repeating this over and over:
Copy code
tornado.application - ERROR - Exception in callback <bound method Nanny.memory_monitor of <Nanny: None, threads: 4>>
Traceback (most recent call last):
  File "c:\my_project\.venv\lib\site-packages\tornado\ioloop.py", line 907, in _run
    return self.callback()
  File "c:\cmy_project\.venv\lib\site-packages\distributed\nanny.py", line 414, in memory_monitor
    process = self.process.process
AttributeError: 'NoneType' object has no attribute 'process'
a
Simplest way would be by making your modules available in
$PYTHONPATH
so that they are visible to the flow when they run in Prefect Server
z
Hi @Greg Roche, what flow storage method are you using for registration?
g
Hi Michael, just the standard Local storage method, nothing fancy
z
Great, I’ve actually only helped people debug this with Docker storage because I’m new but I’ll get a minimal case setup 🙂 I’ll get back to you soon
g
Thanks very much in advance for your help 🙂
z
So here’s a very minimal example that is working on my system https://github.com/madkinsz/prefect-flow-register
🦜 1
When using local storage it’s important to note that the agent must be running in a Python environment with the same packages as your flow.run() environment
e.g. I ran the agent in a python environment where I had run
pip install -e .
with my module. Using docker storage would allow you to setup an environment alongside your flow so I recommend doing that in production but it depends on your setup!
@Greg Roche when you get a chance let me know if this helped! It’d be a nice thing to flesh out for other users if that is the case.
g
thanks so much for your help Michael! I will definitely take a look at this today and let you know 🙂 (sorry for the delay, I'm based in the EU)
Michael, your example has helped out so much, thanks very much for taking the time to put it together! In case it matters for anyone else coming across this, it seems like my main mistake was to have the logic for registering (e.g.) the
foo
flow in
foo_flow.py
, then running
python etl\foo\foo_flow.py
from the root folder to register the flow in Prefect Server, rather than having a
register.py
file in the project root folder to handle the flow registration. In addition I didn't have my
setup.py
file configured correctly by not specifying the
etl
folder as a package. This meant I had to resort to PYTHONPATH hacks and other silliness to try to import from sibling folders, this way is so much cleaner and more elegant and extensible. Michael, please let me know a way to buy you a couple of beers remotely as thanks :D
z
Great to hear! Hopefully we can get some examples and a blog post out soon on our recommended way to structure projects with flows in them.
d
Agree, this should be in a blog post, or shared somewhere, this was just the thing I was missing 👏 🎉
s
@Zanie @Dave @Greg Roche Great insights, is it possible to further comment and share code on how do I add this further multiple flows to Docker Storage (Let's say I have multiple flows spreadout in the repo and wish to register them). Can you append this code with a scheduler also in place to point out how these flows can run from different places in the repo. I mean in the same example or another example where we can see this in action
@Zanie I am getting this error, because I believe I am unable to import the flows folder as part of my Docker build. What should I be doing as part of my Docker build process?
Step 6/10 : RUN mkdir -p /opt/prefect/
 
---> Using cache
 
---> 9b9f5132ea1a
Step 7/10 : COPY z-etl-flow.flow /opt/prefect/flows/z-etl-flow.prefect
 
---> fb8c3ce3bda9
Step 8/10 : COPY z-mr-flow.flow /opt/prefect/flows/z-mr-flow.prefect
 
---> 08102086f4ef
Step 9/10 : COPY healthcheck.py /opt/prefect/healthcheck.py
 
---> 725cca0c7872
Step 10/10 : RUN python /opt/prefect/healthcheck.py '["/opt/prefect/flows/z-etl-flow.prefect", "/opt/prefect/flows/z-mr-flow.prefect"]' '(3, 7)'
 
---> Running in 0edd438d422e
Beginning health checks...
System Version check: OK
/opt/prefect/healthcheck.py:147: UserWarning: Flow uses module which is not importable. Refer to documentation on how to import custom modules <https://docs.prefect.io/api/latest/storage.html#docker>
 
flows = cloudpickle_deserialization_check(flow_file_paths)
Traceback (most recent call last):
 
File "/opt/prefect/healthcheck.py", line 147, in <module>
  
flows = cloudpickle_deserialization_check(flow_file_paths)
 
File "/opt/prefect/healthcheck.py", line 40, in cloudpickle_deserialization_check
  
flows.append(cloudpickle.load(f))
ModuleNotFoundError: No module named 'prefectflows'
Removing intermediate container 0edd438d422e
The command '/bin/sh -c python /opt/prefect/healthcheck.py '["/opt/prefect/flows/z-etl-flow.prefect", "/opt/prefect/flows/z-mr-flow.prefect"]' '(3, 7)'' returned a non-zero code: 1
Traceback (most recent call last):
 
File "register_flows.py", line 36, in <module>
  
storage = storage.build()
 
File "/Users/sagungargs/Documents/portcast/flows/.venv/lib/python3.7/site-packages/prefect/storage/docker.py", line 360, in build
  
self._build_image(push=push)
 
File "/Users/sagungargs/Documents/portcast/flows/.venv/lib/python3.7/site-packages/prefect/storage/docker.py", line 428, in _build_image
  
"Your docker image failed to build! Your flow might have "
ValueError: Your docker image failed to build! Your flow might have failed one of its deployment health checks - please ensure that all necessary files and dependencies have been included.
z
I don’t see the Docker
COPY
for
prefectflows
or
RUN pip install ./prefectflows
in your logs