Jozi Gila
08/07/2025, 9:52 AMMarvin
08/07/2025, 9:52 AMMarvin
08/07/2025, 9:53 AMyour_project/
├── app/
│ ├── __init__.py
│ ├── models/ # Shared SQLAlchemy models
│ │ ├── __init__.py
│ │ └── common.py
│ ├── services/ # Shared service layer
│ │ ├── __init__.py
│ │ └── base.py
│ ├── api/ # FastAPI routes
│ │ └── endpoints.py
│ └── flows/ # Prefect flows
│ └── tasks.py
├── main.py # FastAPI application
└── worker.py # Prefect worker
2. Shared Models Example
python
# app/models/common.py
from sqlalchemy import Column, Integer, String, DateTime
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
class CommonModel(Base):
__tablename__ = "common_table"
id = Column(Integer, primary_key=True)
name = Column(String)
created_at = Column(DateTime)
3. Shared Service Layer
python
# app/services/base.py
from sqlalchemy.orm import Session
from ..models.common import CommonModel
class BaseService:
def __init__(self, db: Session):
self.db = db
def get_by_id(self, model_id: int) -> CommonModel:
return self.db.query(CommonModel).filter(CommonModel.id == model_id).first()
4. FastAPI Integration
python
# main.py
from fastapi import FastAPI, BackgroundTasks
from prefect import flow
from app.services.base import BaseService
from app.flows.tasks import process_data_flow
app = FastAPI()
@app.post("/process/{item_id}")
async def process_item(item_id: int, background_tasks: BackgroundTasks):
# Deploy and run the flow asynchronously
deployment = process_data_flow.from_source(
source="local",
entrypoint="app/flows/tasks.py:process_data_flow"
).deploy(
name="process-data",
work_pool_name="your-work-pool"
)
flow_run = await deployment.run(item_id=item_id)
return {"message": "Processing started", "flow_run_id": flow_run.id}
5. Prefect Flow Definition
python
# app/flows/tasks.py
from prefect import flow, task
from sqlalchemy.orm import Session
from app.services.base import BaseService
from app.models.common import CommonModel
@task
def process_item_task(item_id: int, db: Session):
service = BaseService(db)
item = service.get_by_id(item_id)
# Process the item
return item
@flow
def process_data_flow(item_id: int):
# Setup database session
with Session() as db:
result = process_item_task(item_id, db)
return result
Key Points and Best Practices:
1. Shared Database Session Management:
- Use a common database configuration that can be accessed by both FastAPI and Prefect
- Implement proper session management in both contexts
- Consider using dependency injection for database sessions
2. Service Layer Sharing:
- Create service classes that can be used by both FastAPI endpoints and Prefect tasks
- Keep business logic in the service layer to maintain DRY principles
- Use dependency injection to make services testable
3. Asynchronous Operations:
- FastAPI endpoints can trigger Prefect flows asynchronously
- Use Prefect's async capabilities when needed
- Consider using background tasks for non-blocking operations
4. Error Handling:
- Implement proper error handling in both FastAPI and Prefect contexts
- Use Prefect's built-in retry mechanisms for resilient task execution
- Properly propagate errors to the API clients
5. Monitoring and Logging:
- Use Prefect's built-in monitoring capabilities
- Implement consistent logging across both systems
- Consider using correlation IDs to track requests across systems
6. Configuration Management:
- Use environment variables or configuration files accessible to both systemsMarvin
08/07/2025, 9:53 AMbash
pip install "prefect>=3.0.0" fastapi sqlalchemy
2. Create your work pool:
bash
prefect work-pool create your-work-pool
3. Start your worker:
bash
prefect worker start -p your-work-pool
4. Run your FastAPI application:
bash
uvicorn main:app --reload
This architecture allows you to:
- Share database models and business logic between FastAPI and Prefect
- Maintain clean separation of concerns
- Scale your background processing independently
- Monitor and manage long-running tasks effectively
- Keep your API endpoints responsive while handling heavy processing in the background
Remember to adjust the structure based on your specific needs and scale. You might want to add more layers of abstraction or additional components depending on your use case.