datamongus
07/14/2025, 6:40 PMMarvin
07/14/2025, 6:40 PMMarvin
07/14/2025, 6:40 PMpython
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from prefect import flow
from typing import Dict, Any
# Define your request/response models
class MarvinRequest(BaseModel):
instructions: str
data: str
class MarvinResponse(BaseModel):
data: Dict[str, Any]
app = FastAPI()
# Your existing Marvin flow
@flow
def marvin_flow(instructions: str, data: str) -> Dict[str, Any]:
# Your existing Marvin flow logic here
return {"data": {}} # Replace with your actual processing
# FastAPI endpoint
@app.post("/process", response_model=MarvinResponse)
async def process_data(request: MarvinRequest):
try:
# Execute the flow directly
result = marvin_flow(
instructions=request.instructions,
data=request.data
)
return {"data": result["data"]}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# Health check endpoint
@app.get("/health")
async def health_check():
return {"status": "healthy"}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
To use this setup:
1. Save the code in a file (e.g., api.py
)
2. Install the required dependencies:
bash
pip install fastapi uvicorn prefect
3. Run the FastAPI server:
bash
uvicorn api:app --host 0.0.0.0 --port 8000
You can then make POST requests to <http://localhost:8000/process>
with JSON payloads like:
json
{
"instructions": "your instructions here",
"data": "your data here"
}
Some important notes about this implementation:
1. This approach runs the flow directly in the API request. This is suitable for simple, quick-running flows. If your flows take longer to execute, you might want to consider using background tasks.
2. If you need to handle longer-running flows, you can modify the endpoint to use Prefect's background task capabilities. Here's how you would modify the endpoint for that:
python
@app.post("/process", response_model=MarvinResponse)
async def process_data(request: MarvinRequest):
try:
# Submit the flow run asynchronously
flow_run = marvin_flow.submit(
instructions=request.instructions,
data=request.data
)
return {"data": {"flow_run_id": flow_run.id}}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
3. The implementation includes basic error handling and a health check endpoint.
4. The Pydantic models (MarvinRequest
and MarvinResponse
) provide automatic validation of your input and output data.
5. FastAPI automatically generates OpenAPI documentation for your endpoint, which you can access at <http://localhost:8000/docs>
when running the server.
If your Marvin flows are indeed simple and quick to execute, the first implementation should work well. However, if you find that the flows take longer to process or you need more robust handling of concurrent requests, you might want to consider the background task approach.
Would you like me to provide more details about any particular aspect of this implementation, such as background tasks or error handling?