Pav Staz
07/30/2025, 5:19 PM@main_pipe.on_failure
def notify_if_failed(flow, flow_run, state):
failure_notification(flow, flow_run, state)
if __name__ == "__main__":
main_pipe.serve(
name="main-pipe",
cron="0 4 * * *"
)
in the same directory as the 'pipeline' folder I have a 'utils' folder which contains a messaging.py
so in the pipeline/main.py I have the line
from utils.messaging import _failure_notification_
When i try to deploy the pipeline/main.py I get this error
Traceback (most recent call last):
File "/opt/project/pipelines/main.py", line 5, in <module>
from utils.messaging import failure_notification
ModuleNotFoundError: No module named 'utils'
the utils folder and the pipelines folder has an __init__.py
if main.py is in the root directory I do not get this error
Is it the way prefect deploys flows that I can't import? Or is there something I can do?Marvin
07/30/2025, 5:19 PMMarvin
07/30/2025, 5:20 PMpython
if __name__ == "__main__":
main_pipe.serve(
name="main-pipe",
cron="0 4 * * *",
job_variables={
"env": {
"PYTHONPATH": "/path/to/your/project/root"
}
}
)
2. Modify Import Structure
Instead of having your code in subdirectories, you could restructure your project to use a more Python-friendly layout:
your_project/
├── src/
│ ├── __init__.py
│ ├── pipeline/
│ │ ├── __init__.py
│ │ └── main.py
│ └── utils/
│ ├── __init__.py
│ └── messaging.py
Then modify your import to be:
python
from src.utils.messaging import failure_notification
3. Use sys.path Modification (Not Recommended, but Quick Fix)
At the top of your main.py, you could add:
python
import os
import sys
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
4. Package Your Code
The most proper solution would be to package your code properly with setuptools. Create a setup.py file in your root directory:
python
from setuptools import setup, find_packages
setup(
name="your-project-name",
packages=find_packages(),
version="0.1.0",
)
Then install it in development mode:
bash
pip install -e .
For your specific case, I would recommend either solution #1 (PYTHONPATH) or #2 (restructuring). The PYTHONPATH solution is the quickest fix that doesn't require code changes, while restructuring would be a more maintainable long-term solution.
When Prefect deploys a flow, it needs to be able to find all the modules that the flow depends on. By default, it only has access to installed packages and the immediate directory containing the flow file. By setting PYTHONPATH or restructuring your imports, you ensure that Python can find all the necessary modules during deployment.
Let me know if you'd like me to elaborate on any of these solutions or if you need help implementing a specific approach!Pav Staz
07/30/2025, 5:33 PMPav Staz
07/30/2025, 5:33 PMMarvin
07/30/2025, 5:33 PMMarvin
07/30/2025, 5:33 PMpython
if __name__ == "__main__":
main_pipe.serve(
name="main-pipe",
cron="0 4 * * *",
job_variables={
"env": {
"PYTHONPATH": "/opt/project" # Adjust this path to your project root
}
}
)
2. Solution 2: Use Relative Imports (More Pythonic)
Instead of using from src.utils.messaging
, use relative imports. In your `pipeline/main.py`:
python
from ...utils.messaging import failure_notification
This assumes your directory structure is:
/opt/project/
└── src/
├── __init__.py
├── pipeline/
│ ├── __init__.py
│ └── main.py
└── utils/
├── __init__.py
└── messaging.py
3. Solution 3: Package Your Code (Most Robust)
Create a setup.py
or pyproject.toml
in your project root:
python
# setup.py
from setuptools import setup, find_packages
setup(
name="your-project-name",
packages=find_packages(),
version="0.1.0",
)
Then in your deployment configuration:
python
if __name__ == "__main__":
main_pipe.serve(
name="main-pipe",
cron="0 4 * * *",
install_commands=[
"pip install -e /opt/project" # Adjust path as needed
]
)
I recommend starting with Solution 1 (PYTHONPATH) as it's the quickest fix. If that doesn't work, try Solution 2 (relative imports). Solution 3 (packaging) is the most robust but requires more setup.
The reason you're seeing this issue is that when Prefect deploys your flow, it creates a new Python environment for the flow run, and this environment doesn't automatically know about your project's structure. By using one of these solutions, you're explicitly telling Python where to find your modules.
Let me know if you'd like me to explain any of these solutions in more detail or if you need help implementing them!Pav Staz
07/31/2025, 9:22 AMMarvin
07/31/2025, 9:22 AMMarvin
07/31/2025, 9:23 AMbash
export PYTHONPATH=/opt/project:$PYTHONPATH
python src/pipeline/main.py
2. Using a Deployment (Recommended)
Create a prefect.yaml
file in your project root:
yaml
deployments:
- name: main-pipe
entrypoint: src/pipeline/main.py:main_pipe
schedule:
cron: "0 4 * * *"
work_pool:
name: your-work-pool-name
job_variables:
env:
PYTHONPATH: /opt/project
Then deploy using:
bash
prefect deploy
3. Alternative: Use deploy()
in Code
Instead of using serve()
, you can use the more modern deploy()
method:
python
if __name__ == "__main__":
main_pipe.deploy(
name="main-pipe",
schedule={"cron": "0 4 * * *"},
work_pool_name="your-work-pool-name",
job_variables={
"env": {
"PYTHONPATH": "/opt/project"
}
}
)
I recommend using option 2 (deployment with prefect.yaml
) or option 3 (deploy()
) as they're more in line with current Prefect 3.x practices. The serve()
method is more limited in terms of configuration options.
Would you like me to explain any of these approaches in more detail? Also, which work pool are you using? That might affect which approach would work best for your situation.Pav Staz
07/31/2025, 9:33 AMif __name__ == "__main__":
main1.serve(
name="daily-pipeline",
cron="0 4 * * *"
)
main2.serve(
name="daily-pipeline1",
cron="0 4 * * *"
)
Marvin
07/31/2025, 9:33 AMMarvin
07/31/2025, 9:34 AMserve()
is a blocking operation - it runs indefinitely to manage the flow deployment. When you call serve()
, it starts a long-running process that continuously watches for scheduled runs and triggers.
Here are two ways to serve multiple flows:
1. Serve Multiple Flows Together (Recommended)
python
if __name__ == "__main__":
main1.serve(
name="daily-pipeline",
cron="0 4 * * *",
other_flows=[main2] # Add other flows here
)
2. Use Deployments Instead (More Flexible)
python
if __name__ == "__main__":
main1.deploy(
name="daily-pipeline",
schedule={"cron": "0 4 * * *"}
)
main2.deploy(
name="daily-pipeline1",
schedule={"cron": "0 4 * * *"}
)
The second approach using deploy()
is generally more flexible and aligns better with Prefect 3.x practices. After deploying, you can start a worker to run these deployments:
bash
prefect worker start -p "your-work-pool-name"
Would you like me to explain either of these approaches in more detail?