Aaron
07/15/2025, 6:19 PMfrom prefect.deployments import Deployment
from prefect.filesystems import GCS
#### DEFINE COMMON VARIABLES ####
storage = GCS.load("test-block")
path = "deployments"
version = 1
### IMPORT FLOWS ####
from deployments.test_deployment_one.test_pipeline_one import execute_pipeline as execute_pipeline_one
from deployments.test_deployment_two.test_pipeline_two import execute_pipeline as execute_pipeline_two
#### DEFINE DEPLOYMENTS ####
deployment = Deployment.build_from_flow(
name="Testing 1",
flow=execute_pipeline_one,
work_queue_name="main",
work_pool_name= "default-agent-pool",
version=version,
storage=storage,
path=path
)
deployment.apply()
deployment = Deployment.build_from_flow(
name="Testing 2",
flow=execute_pipeline_two,
work_queue_name="main",
work_pool_name= "default-agent-pool",
version=version,
storage=storage,
path=path
)
deployment.apply()
Marvin
07/15/2025, 6:19 PMMarvin
07/15/2025, 6:20 PMyour_project/
├── common_functions/
│ └── __init__.py
│ └── utils.py
├── flows/
│ ├── test_pipeline_one.py
│ └── test_pipeline_two.py
├── prefect.yaml
└── requirements.txt
2. Update your deployment script to use the modern Prefect 3.x approach:
python
from prefect import flow
from prefect.filesystems import GCS
# Load your GCS block
storage = GCS.load("test-block")
# Import your flows
from flows.test_pipeline_one import execute_pipeline as execute_pipeline_one
from flows.test_pipeline_two import execute_pipeline as execute_pipeline_two
# Deploy both flows with shared configuration
shared_config = dict(
work_queue_name="main",
work_pool_name="default-agent-pool",
version="1"
)
# Deploy first flow
execute_pipeline_one.from_source(
source=storage.path_prefix, # Your GCS path
name="Testing 1",
**shared_config
).deploy()
# Deploy second flow
execute_pipeline_two.from_source(
source=storage.path_prefix,
name="Testing 2",
**shared_config
).deploy()
3. In your flow files, you can import the common functions normally:
python
# flows/test_pipeline_one.py
from prefect import flow
from common_functions.utils import your_shared_function
@flow
def execute_pipeline():
your_shared_function()
# rest of your flow code
Key improvements in this approach:
1. Uses the modern flow.from_source()
and .deploy()
pattern instead of the deprecated Deployment.build_from_flow()
2. Properly organizes shared code in a package structure
3. Avoids redundant code syncing since the entire package is treated as a unit
4. Makes use of Prefect 3.x's improved deployment mechanisms
To make this work effectively:
1. Make sure your common_functions
directory has an __init__.py
file to make it a proper Python package
2. When pushing to GCS, push the entire project directory structure
3. Consider using a requirements.txt
or setup.py
to manage dependencies
4. You might want to use a prefect.yaml
file for more declarative deployment configuration
Would you like me to provide more details about any of these aspects or help you with a specific part of the implementation?Aaron
07/15/2025, 6:22 PMMarvin
07/15/2025, 6:22 PMMarvin
07/15/2025, 6:23 PMdeployment.apply()
will indeed sync the code separately, which means your current setup is causing redundant uploads of the shared code. Here are a few approaches to handle this more efficiently:
1. Single Deployment Script Approach
Instead of syncing the code multiple times, you could create a single deployment that contains multiple flows:
python
from prefect.filesystems import GCS
from prefect.deployments import Deployment
storage = GCS.load("test-block")
path = "deployments"
version = 1
# Import your flows
from deployments.test_deployment_one.test_pipeline_one import execute_pipeline as execute_pipeline_one
from deployments.test_deployment_two.test_pipeline_two import execute_pipeline as execute_pipeline_two
# Create a parent flow that can run either child flow
from prefect import flow
@flow
def parent_flow(flow_to_run: str):
if flow_to_run == "pipeline_one":
execute_pipeline_one()
elif flow_to_run == "pipeline_two":
execute_pipeline_two()
# Create single deployment
deployment = Deployment.build_from_flow(
name="combined_deployment",
flow=parent_flow,
work_queue_name="main",
work_pool_name="default-agent-pool",
version=version,
storage=storage,
path=path
)
deployment.apply()
2. Manual Sync Approach
You could manually sync your code to GCS first, then create deployments that reference the already-synced code:
python
from prefect.filesystems import GCS
from prefect.deployments import Deployment
storage = GCS.load("test-block")
# First, sync your code manually
storage.put_directory(local_path=".", remote_path="deployments")
# Then create deployments without re-uploading
deployment1 = Deployment.build_from_flow(
name="Testing 1",
flow=execute_pipeline_one,
work_queue_name="main",
work_pool_name="default-agent-pool",
version=version,
storage=storage,
path="deployments",
skip_upload=True # Important: Skip the upload since code is already there
)
deployment1.apply()
deployment2 = Deployment.build_from_flow(
name="Testing 2",
flow=execute_pipeline_two,
work_queue_name="main",
work_pool_name="default-agent-pool",
version=version,
storage=storage,
path="deployments",
skip_upload=True # Important: Skip the upload since code is already there
)
deployment2.apply()
3. Consider Using Workers
In Prefect 2.8.0, you might want to consider using workers instead, which can pull code from a Git repository. This approach naturally handles shared code better:
python
from prefect.deployments import Deployment
# Assuming your code is in a Git repository
deployment = Deployment.build_from_flow(
name="Testing 1",
flow=execute_pipeline_one,
work_queue_name="main",
storage={
"type": "github",
"repository": "your-repo-url",
"reference": "main", # or specific branch/tag
"access_token": "{{ github_access_token }}" # reference to a secret
}
)
Given your current setup, I'd recommend either:
1. The "Single Deployment Script Approach" if you want to keep things simple and don't mind running flows through a parent flow
2. The "Manual Sync Approach" if you need to keep the flows separate but want to avoid redundant uploads
Would you like me to elaborate on any of these approaches or help you implement one of them?Aaron
07/15/2025, 6:29 PMMarvin
07/15/2025, 6:29 PMMarvin
07/15/2025, 6:29 PMskip_upload=True
parameter in your existing code. This is the minimal change needed:
python
from prefect.deployments import Deployment
from prefect.filesystems import GCS
storage = GCS.load("test-block")
path = "deployments"
version = 1
# First deployment
deployment = Deployment.build_from_flow(
name="Testing 1",
flow=execute_pipeline_one,
work_queue_name="main",
work_pool_name= "default-agent-pool",
version=version,
storage=storage,
path=path,
skip_upload=True # Add this line
)
deployment.apply()
# Second deployment
deployment = Deployment.build_from_flow(
name="Testing 2",
flow=execute_pipeline_two,
work_queue_name="main",
work_pool_name= "default-agent-pool",
version=version,
storage=storage,
path=path,
skip_upload=True # Add this line
)
deployment.apply()
Then, you can manually upload your code once to GCS before running this script:
python
storage.put_directory(local_path=".", remote_path=path)
This way:
1. You upload the code once manually
2. Your deployments will use the already-uploaded code instead of trying to upload it again
3. You keep your existing code structure almost exactly the same
Would you like me to explain more about how this works or help you implement it?Aaron
07/15/2025, 6:32 PMMarvin
07/15/2025, 6:32 PMMarvin
07/15/2025, 6:32 PMpython
from prefect.deployments import Deployment
from prefect.filesystems import GCS
#### DEFINE COMMON VARIABLES ####
storage = GCS.load("test-block")
path = "deployments"
version = 1
### IMPORT FLOWS ####
from deployments.test_deployment_one.test_pipeline_one import execute_pipeline as execute_pipeline_one
from deployments.test_deployment_two.test_pipeline_two import execute_pipeline as execute_pipeline_two
# First, sync all code to GCS once
storage.put_directory(local_path=".", remote_path=path)
# List of all deployments to create
deployments_config = [
{
"name": "Testing 1",
"flow": execute_pipeline_one,
},
{
"name": "Testing 2",
"flow": execute_pipeline_two,
}
]
# Common deployment settings
common_settings = dict(
work_queue_name="main",
work_pool_name="default-agent-pool",
version=version,
storage=storage,
path=path,
skip_upload=True # Skip upload since we already did it once
)
# Create all deployments
for config in deployments_config:
deployment = Deployment.build_from_flow(
**config,
**common_settings
)
deployment.apply()
This approach:
1. Syncs the code to GCS once at the start
2. Creates all deployments using the already-synced code
3. Is easy to maintain - just add new deployments to the deployments_config
list
4. Works well in CI/CD as it's a single script that handles everything
You could put this in a file like deploy.py
and then in your CI/CD pipeline (like GitHub Actions, GitLab CI, etc.), you would just run this script after merging to main.
For example, in a GitHub Action it might look like:
yaml
name: Deploy to Prefect
on:
push:
branches:
- main
jobs:
deploy:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Set up Python
uses: actions/setup-python@v2
with:
python-version: '3.9'
- name: Install dependencies
run: |
pip install prefect prefect-gcp
- name: Deploy to Prefect
run: python deploy.py
env:
PREFECT_API_KEY: ${{ secrets.PREFECT_API_KEY }}
GCP_CREDENTIALS: ${{ secrets.GCP_CREDENTIALS }}
This gives you a single, automated process that:
1. Only syncs the code once
2. Creates/updates all your deployments
3. Runs automatically when code is merged to main
4. Is easy to maintain and extend
Would you like me to explain any part of this in more detail or help you adapt it to your specific CI/CD platform?Aaron
07/16/2025, 12:21 PMMarvin
07/16/2025, 12:21 PMMarvin
07/16/2025, 12:23 PMyour_project/
├── flows/
│ ├── __init__.py
│ ├── test_pipeline_one.py
│ └── test_pipeline_two.py
├── common_functions/
│ └── utils.py
├── deployments/
│ ├── deploy.py
│ ├── config.py
│ └── environments/
│ ├── dev.yaml
│ └── prod.yaml
├── requirements.txt
└── README.md
### 2. Environment Configuration Files
`deployments/environments/dev.yaml`:
yaml
prefect_api_url: "<https://api.prefect.cloud/api/accounts/[YOUR-ACCOUNT]/workspaces/[DEV-WORKSPACE]>"
gcs_bucket: "dev-bucket"
gcs_path: "prefect-deployments/dev"
work_pool_name: "dev-pool"
work_queue_name: "dev"
`deployments/environments/prod.yaml`:
yaml
prefect_api_url: "<https://api.prefect.cloud/api/accounts/[YOUR-ACCOUNT]/workspaces/[PROD-WORKSPACE]>"
gcs_bucket: "prod-bucket"
gcs_path: "prefect-deployments/prod"
work_pool_name: "prod-pool"
work_queue_name: "prod"
### 3. Configuration Manager
`deployments/config.py`:
python
import yaml
from dataclasses import dataclass
from typing import Optional
@dataclass
class Environment:
prefect_api_url: str
gcs_bucket: str
gcs_path: str
work_pool_name: str
work_queue_name: str
@classmethod
def load(cls, env: str) -> "Environment":
with open(f"deployments/environments/{env}.yaml", "r") as f:
config = yaml.safe_load(f)
return cls(**config)
### 4. Deployment Script
`deployments/deploy.py`:
python
import os
import sys
from prefect.deployments import Deployment
from prefect.filesystems import GCS
from config import Environment
def create_deployments(env_name: str):
# Load environment configuration
env = Environment.load(env_name)
# Configure GCS storage
storage = GCS.load("gcs-block")
storage.bucket = env.gcs_bucket
# Import flows
from flows.test_pipeline_one import execute_pipeline as execute_pipeline_one
from flows.test_pipeline_two import execute_pipeline as execute_pipeline_two
# Define deployments configuration
deployments_config = [
{
"name": f"pipeline-one-{env_name}",
"flow": execute_pipeline_one,
"tags": [env_name]
},
{
"name": f"pipeline-two-{env_name}",
"flow": execute_pipeline_two,
"tags": [env_name]
}
]
# First sync all code
storage.put_directory(
local_path=".",
remote_path=f"{env.gcs_path}"
)
# Common deployment settings
common_settings = dict(
work_queue_name=env.work_queue_name,
work_pool_name=env.work_pool_name,
storage=storage,
path=env.gcs_path,
skip_upload=True
)
# Create all deployments
for config in deployments_config:
deployment = Deployment.build_from_flow(
**config,
**common_settings
)
deployment.apply()
if __name__ == "__main__":
env_name = sys.argv[1] if len(sys.argv) > 1 else "dev"
create_deployments(env_name)
### 5. GitHub Actions Workflow
`.github/workflows/prefect-cicd.yml`:
```yaml
name: Prefect CI/CD
on:
push:
branches:
- develop
- main
jobs:
deploy:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Set up Python
uses: actions/setup-python@v2
with:
python-version: '3.9'
- name: Install dependencies
run: |
pip install -r requirements.txt
- name: Set environment
id: set-env
run: |
if [[ ${{ github.ref }} == 'refs/heads/develop' ]]; then
echo "::set-output name=env::dev"
elif [[ ${{ github.ref }} == 'refs/heads/main' ]]; then
echo "::set-output name=env::prod"
fi
- name: Deploy to PrefectMarvin
07/16/2025, 12:23 PM### 6. Usage
The workflow works as follows:
1. *Development Process*:
- Developers work on feature branches
- When ready, they merge to `develop` branch
- This triggers deployment to Dev environment
- After testing in Dev, code is merged to `main`
- This triggers deployment to Prod environment
2. *Running Locally*:
```bash
# Deploy to dev
python deployments/deploy.py dev
# Deploy to prod
python deployments/deploy.py prod
### 7. Additional Considerations
1. Secrets Management:
- Store sensitive values in GitHub Secrets
- Use different API keys for Dev and Prod
- Store GCP credentials securely
2. Block Configuration:
- Create separate GCS blocks for Dev and Prod
- Use different service accounts for each environment
3. Testing:
- Add unit tests for flows
- Add integration tests to run in Dev before Prod deployment
- Consider adding flow testing in CI/CD pipeline
4. Monitoring:
- Set up different notifications for Dev and Prod
- Configure more stringent monitoring for Prod
### 8. Best Practices
1. Version Control:
- Tag releases when deploying to production
- Use semantic versioning for deployments
- Keep deployment configurations in version control
2. Security:
- Use separate service accounts for Dev and Prod
- Restrict Prod deployment permissions
- Implement approval process for Prod deployments
3. Documentation:
- Document deployment process
- Maintain changelog
- Document environment-specific configurations
Would you like me to elaborate on any of these aspects or help you implement a specific part of this setup?Aaron
07/16/2025, 12:24 PMMarvin
07/16/2025, 12:24 PMMarvin
07/16/2025, 12:26 PMyour_project/
├── flows/
│ ├── __init__.py
│ ├── test_pipeline_one.py
│ └── test_pipeline_two.py
├── common_functions/
│ └── utils.py
├── deployments/
│ ├── deploy.py
│ ├── config.py
│ └── environments/
│ ├── dev.yaml
│ └── prod.yaml
├── requirements.txt
└── README.md
### 2. Environment Configuration Files
`deployments/environments/dev.yaml`:
yaml
prefect_api_url: "<http://dev-prefect-server:4200/api>" # Dev server instance
gcs_bucket: "dev-bucket"
gcs_path: "prefect-deployments/dev"
work_pool_name: "dev-pool"
work_queue_name: "dev"
`deployments/environments/prod.yaml`:
yaml
prefect_api_url: "<http://prod-prefect-server:4200/api>" # Prod server instance
gcs_bucket: "prod-bucket"
gcs_path: "prefect-deployments/prod"
work_pool_name: "prod-pool"
work_queue_name: "prod"
### 3. Server Profile Setup Script
`deployments/setup_profiles.py`:
python
import subprocess
import sys
def setup_profile(env: str, config_file: str):
"""Set up a Prefect profile for the specified environment"""
# Create new profile
subprocess.run(["prefect", "profile", "create", env])
# Set as active profile
subprocess.run(["prefect", "profile", "use", env])
# Configure API URL from config
with open(config_file, 'r') as f:
config = yaml.safe_load(f)
api_url = config['prefect_api_url']
subprocess.run(["prefect", "config", "set", "PREFECT_API_URL", api_url])
if __name__ == "__main__":
env = sys.argv[1]
setup_profile(env, f"deployments/environments/{env}.yaml")
### 4. Deployment Script
`deployments/deploy.py`:
python
import os
import sys
import subprocess
from prefect.deployments import Deployment
from prefect.filesystems import GCS
from config import Environment
def switch_profile(env_name: str):
"""Switch to the correct Prefect profile"""
subprocess.run(["prefect", "profile", "use", env_name])
def create_deployments(env_name: str):
# Switch to correct profile
switch_profile(env_name)
# Load environment configuration
env = Environment.load(env_name)
# Configure GCS storage
storage = GCS.load(f"gcs-{env_name}") # Separate storage blocks for each env
storage.bucket = env.gcs_bucket
# Import flows
from flows.test_pipeline_one import execute_pipeline as execute_pipeline_one
from flows.test_pipeline_two import execute_pipeline as execute_pipeline_two
# Define deployments configuration
deployments_config = [
{
"name": f"pipeline-one-{env_name}",
"flow": execute_pipeline_one,
"tags": [env_name]
},
{
"name": f"pipeline-two-{env_name}",
"flow": execute_pipeline_two,
"tags": [env_name]
}
]
# First sync all code
storage.put_directory(
local_path=".",
remote_path=f"{env.gcs_path}"
)
# Common deployment settings
common_settings = dict(
work_queue_name=env.work_queue_name,
work_pool_name=env.work_pool_name,
storage=storage,
path=env.gcs_path,
skip_upload=True
)
# Create all deployments
for config in deployments_config:
deployment = Deployment.build_from_flow(
**config,
**common_settings
)
deployment.apply()
if __name__ == "__main__":
env_name = sys.argv[1] if len(sys.argv) > 1 else "dev"
create_deployments(env_name)
### 5. GitHub Actions Workflow
`.github/workflows/prefect-cicd.yml`:
```yaml
name: Prefect CI/CD
on:
push:
branches:
- develop
- main
jobs:
deploy:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Set up Python
uses: actions/setup-python@v2
with:
python-version: '3.9'Marvin
07/16/2025, 12:26 PM### 6. Initial Setup Steps
1. *Set up Prefect Server instances*:
```bash
# Dev Server (on dev environment)
prefect server start --host 0.0.0.0 --port 4200
# Prod Server (on prod environment)
prefect server start --host 0.0.0.0 --port 4200
2. Create profiles for each environment:
bash
# Create and configure dev profile
python deployments/setup_profiles.py dev
# Create and configure prod profile
python deployments/setup_profiles.py prod
3. Create GCS blocks for each environment:
bash
# Switch to dev profile and create block
prefect profile use dev
prefect block register -m prefect_gcp
# Create GCS block named 'gcs-dev' via UI or CLI
# Switch to prod profile and create block
prefect profile use prod
# Create GCS block named 'gcs-prod' via UI or CLI
### 7. Key Differences from Cloud Version
1. Profile Management:
- Uses Prefect profiles to manage different server instances
- Each environment has its own server instance
- No workspace concept like in Prefect Cloud
2. Authentication:
- No API keys needed (unless you've configured authentication)
- Server URLs are internal/private network addresses
- Might need VPN/SSH tunneling for remote access
3. Block Management:
- Blocks need to be created separately in each server instance
- Use environment-specific names for blocks
### 8. Best Practices for Server Setup
1. Server Infrastructure:
- Use separate servers/containers for Dev and Prod
- Configure proper networking and security groups
- Consider using PostgreSQL instead of SQLite for production
2. Backup and Maintenance:
- Regular database backups for Prod server
- Monitoring for server health
- Update strategies for server maintenance
3. Security:
- Configure authentication if needed
- Use internal network for server communication
- Implement proper firewall rules
Would you like me to elaborate on any of these aspects or help you implement a specific part of this setup?Aaron
07/16/2025, 12:35 PMMarvin
07/16/2025, 12:35 PMMarvin
07/16/2025, 12:36 PMpython
import os
import subprocess
import argparse
from pathlib import Path
class PrefectServerManager:
def __init__(self, env: str):
self.env = env
self.base_dir = Path.home() / ".prefect" / env
# Configuration for different environments
self.configs = {
"dev": {
"port": 4200,
"host": "127.0.0.1",
"database_url": f"sqlite:///{self.base_dir}/prefect.db"
},
"prod": {
"port": 4201, # Different port for prod
"host": "127.0.0.1",
"database_url": f"sqlite:///{self.base_dir}/prefect.db"
}
}
# Ensure environment is valid
if env not in self.configs:
raise ValueError(f"Environment must be one of: {list(self.configs.keys())}")
self.config = self.configs[env]
def setup_environment(self):
"""Create necessary directories and set up environment"""
# Create directory for environment
os.makedirs(self.base_dir, exist_ok=True)
# Create profile for this environment
subprocess.run(["prefect", "profile", "create", self.env])
# Set profile configurations
subprocess.run([
"prefect", "config", "set",
"PREFECT_API_URL",
f"http://{self.config['host']}:{self.config['port']}/api"
])
# Set database URL
os.environ["PREFECT_SERVER_DATABASE_CONNECTION_URL"] = self.config["database_url"]
def start_server(self):
"""Start the Prefect server for this environment"""
print(f"Starting Prefect server for {self.env} environment...")
print(f"API URL will be: http://{self.config['host']}:{self.config['port']}/api")
print(f"Database URL: {self.config['database_url']}")
# Start the server
subprocess.run([
"prefect", "server", "start",
"--host", self.config["host"],
"--port", str(self.config["port"])
])
def main():
parser = argparse.ArgumentParser(description="Manage Prefect Server instances")
parser.add_argument("environment", choices=["dev", "prod"], help="Environment to manage")
parser.add_argument("--setup-only", action="store_true", help="Only set up environment without starting server")
args = parser.parse_args()
manager = PrefectServerManager(args.environment)
manager.setup_environment()
if not args.setup_only:
manager.start_server()
if __name__ == "__main__":
main()
### Usage Instructions
1. Initial Setup
First, create both environments:
bash
# Setup dev environment
python server_setup.py dev --setup-only
# Setup prod environment
python server_setup.py prod --setup-only
2. Starting Servers
You'll need two terminal windows to run both servers simultaneously:
Terminal 1 (Dev Server):
bash
python server_setup.py dev
Terminal 2 (Prod Server):
bash
python server_setup.py prod
3. Verifying Setup
You can verify the setup by checking each server:
bash
# Check dev server
prefect profile use dev
prefect server status
# Check prod server
prefect profile use prod
prefect server status
### Server URLs:
- Dev: http://127.0.0.1:4200
- Prod: http://127.0.0.1:4201
### Quick Reference for Profiles
Switch between environments:
bash
# Switch to dev
prefect profile use dev
# Switch to prod
prefect profile use prod
# List all profiles
prefect profile ls
### Testing Your Setup
Here's a simple test script to verify both environments:
`test_environments.py`:
```python
import subprocess
import requests
import timeMarvin
07/16/2025, 12:36 PMprefect profile use [env]
4. Local Testing:
- This setup is for local testing/development
- For actual production, you'd want to:
- Use PostgreSQL instead of SQLite
- Set up proper authentication
- Use proper networking/firewall rules
- Consider containerization (Docker)
Would you like me to explain any part of this setup in more detail or help you with testing it?