Macphail Magwira
03/20/2025, 9:29 PMChris White
Macphail Magwira
03/21/2025, 8:06 AMMacphail Magwira
03/21/2025, 8:06 AMMacphail Magwira
03/21/2025, 8:07 AM# src/harmony_backend/api/v1/endpoints/workflow.py
import logging
from typing import Any, Dict
from fastapi import APIRouter, Depends, HTTPException, Request
from sqlalchemy.orm import Session
from harmony_backend.api.dependencies.auth import get_current_user
from harmony_backend.db.session import get_db
from harmony_backend.schemas.workflows.workflow import WorkflowDefinition, User
from harmony_backend.services.workflows.workflow_service import WorkflowService
from harmony_backend.db.daos.workflow.workflow_dao import WorkflowDAO
router = APIRouter()
# Configure logging
logger = logging.getLogger(__name__)
@router.post("/deploy", response_model=dict)
async def deploy_workflow(
request: Request,
workflow_def: WorkflowDefinition,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
) -> Dict[str, Any]:
"""
Deploy a workflow to Prefect.
Requires authentication with a valid JWT token.
"""
try:
<http://logger.info|logger.info>(f"Starting deploy_workflow function for user: {current_user.email}")
workflow_service = WorkflowService()
logger.debug("WorkflowService initialized")
# Generate flow from the definition
flow_func = workflow_service.generate_flow_from_definition(workflow_def, current_user)
# Deploy the flow WITHOUT executing it
deployment_result = workflow_service.deploy_flow_without_execution(
flow_func=flow_func,
deployment_name=workflow_def.name,
work_pool_name="default-process-pool",
description=workflow_def.workflowDescription,
)
# Unpack the result
deployment_id = deployment_result["deployment_id"]
flow_id = deployment_result["flow_id"]
# Schedule the flow run to begin later
# This allows immediate response to the frontend
flow_run_id = workflow_service.create_delayed_flow_run(
deployment_id=deployment_id,
flow_id=flow_id,
delay_seconds=60
)
# Use the DAO to update the workflow
workflow_dao = WorkflowDAO(db)
workflow_dao.update_prefect_flow_id(workflow_def.id, flow_id)
<http://logger.info|logger.info>(f"Workflow deployed successfully. Deployment ID: {deployment_id}, Flow Run ID: {flow_run_id}")
return {
"status": "success",
"message": f"Workflow {workflow_def.name} deployed and scheduled to run in 60 seconds",
"deployment_id": deployment_id,
"flow_id": flow_id,
"flow_run_id": flow_run_id,
"deployed_by": current_user.id,
}
except HTTPException:
# Let HTTPExceptions propagate
raise
except Exception as e:
logger.error(f"Exception occurred: {str(e)}")
raise HTTPException(
status_code=500, detail=f"Failed to deploy workflow: {str(e)}"
)
def create_delayed_flow_run(self, deployment_id: str, flow_id: str, delay_seconds: int = 60, parameters: dict = None) -> str:
"""
Create a flow run from a deployment that starts after a specified delay.
Args:
deployment_id: The ID of the deployment (for reference/logging)
flow_id: The ID of the flow to run
delay_seconds: Number of seconds to delay the flow run start (default: 60)
parameters: Optional parameters to pass to the flow run
Returns:
The flow run ID
Raises:
ValueError: If the flow run cannot be created
"""
if parameters is None:
parameters = {}
try:
# Calculate start time in the future
scheduled_time = datetime.datetime.utcnow() + datetime.timedelta(seconds=delay_seconds)
# Format as ISO 8601 string with Z timezone indicator
scheduled_time_str = scheduled_time.isoformat(timespec='milliseconds') + 'Z'
run_data = {
"flow_id": flow_id,
"parameters": parameters,
"state": {
"type": "SCHEDULED",
"state_details": {
"scheduled_time": scheduled_time_str
}
}
}
response = <http://requests.post|requests.post>(
f"{self.prefect_api_url}/flow_runs/",
json=run_data,
headers=self.headers
)
if response.status_code in [200, 201]:
flow_run = response.json()
flow_run_id = flow_run["id"]
logger.debug(f"Created delayed flow run with ID: {flow_run_id}, starting in {delay_seconds} seconds")
return flow_run_id
else:
logger.warning(
f"Failed to create delayed flow run: {response.status_code}, {response.text}"
)
raise ValueError(f"Failed to create delayed flow run for deployment {deployment_id}")
except Exception as e:
logger.error(f"Error creating delayed flow run: {e}")
raise