Hey guys, I'm new to prefect, im running version 3...
# ask-community
m
Hey guys, I'm new to prefect, im running version 3, a local prefect server on my docker network, I'm able to register, deploy, and execute functions from my fast API server through prefects API, but my issue comes with scheduling, when I schedule, it never runs and transitions from "scheduled" to "late" indefinitely and never runs, I have a worker pool running and added to function upon registration so I'm not sure whats happening here, any help, thanks in advance
c
Hi Macphail - have you also started a worker process that is pulling from the work pool? https://docs.prefect.io/v3/deploy/infrastructure-concepts/workers#learn-about-workers (also for future reference we ask that you put code snippets in a thread instead of as large blocks in the main channel, thank you! 🙏 )
m
@Chris White yes i have both my prefect server up as well as worker process, when I schedule it directly in the prefect UI , It runs fine
👍 1
Noted, will add code into a thread
here is my implementation for reference
Copy code
# src/harmony_backend/api/v1/endpoints/workflow.py

import logging
from typing import Any, Dict
from fastapi import APIRouter, Depends, HTTPException, Request
from sqlalchemy.orm import Session

from harmony_backend.api.dependencies.auth import get_current_user
from harmony_backend.db.session import get_db
from harmony_backend.schemas.workflows.workflow import WorkflowDefinition, User
from harmony_backend.services.workflows.workflow_service import WorkflowService
from harmony_backend.db.daos.workflow.workflow_dao import WorkflowDAO

router = APIRouter()

# Configure logging
logger = logging.getLogger(__name__)

@router.post("/deploy", response_model=dict)
async def deploy_workflow(
    request: Request,
    workflow_def: WorkflowDefinition,
    db: Session = Depends(get_db),
    current_user: User = Depends(get_current_user)
) -> Dict[str, Any]:
    """
    Deploy a workflow to Prefect.
    Requires authentication with a valid JWT token.
    """
    try:
        <http://logger.info|logger.info>(f"Starting deploy_workflow function for user: {current_user.email}")
        workflow_service = WorkflowService()
        logger.debug("WorkflowService initialized")
        
        # Generate flow from the definition
        flow_func = workflow_service.generate_flow_from_definition(workflow_def, current_user)

        # Deploy the flow WITHOUT executing it
        deployment_result = workflow_service.deploy_flow_without_execution(
            flow_func=flow_func,
            deployment_name=workflow_def.name,
            work_pool_name="default-process-pool",
            description=workflow_def.workflowDescription,
        )
        
        # Unpack the result
        deployment_id = deployment_result["deployment_id"]
        flow_id = deployment_result["flow_id"]
        
        # Schedule the flow run to begin later
        # This allows immediate response to the frontend
        
        flow_run_id = workflow_service.create_delayed_flow_run(
            deployment_id=deployment_id,
            flow_id=flow_id,
            delay_seconds=60
            )        
        
        # Use the DAO to update the workflow
        workflow_dao = WorkflowDAO(db)
        workflow_dao.update_prefect_flow_id(workflow_def.id, flow_id)
        
        <http://logger.info|logger.info>(f"Workflow deployed successfully. Deployment ID: {deployment_id}, Flow Run ID: {flow_run_id}")
        
        return {
            "status": "success",
            "message": f"Workflow {workflow_def.name} deployed and scheduled to run in 60 seconds",
            "deployment_id": deployment_id,
            "flow_id": flow_id,
            "flow_run_id": flow_run_id,
            "deployed_by": current_user.id,
        }
        
    except HTTPException:
        # Let HTTPExceptions propagate
        raise
    except Exception as e:
        logger.error(f"Exception occurred: {str(e)}")
        raise HTTPException(
            status_code=500, detail=f"Failed to deploy workflow: {str(e)}"
        )
Copy code
def create_delayed_flow_run(self, deployment_id: str, flow_id: str, delay_seconds: int = 60, parameters: dict = None) -> str:
        """
        Create a flow run from a deployment that starts after a specified delay.
        
        Args:
            deployment_id: The ID of the deployment (for reference/logging)
            flow_id: The ID of the flow to run
            delay_seconds: Number of seconds to delay the flow run start (default: 60)
            parameters: Optional parameters to pass to the flow run
            
        Returns:
            The flow run ID
            
        Raises:
            ValueError: If the flow run cannot be created
        """
        if parameters is None:
            parameters = {}
            
        try:
            # Calculate start time in the future
            scheduled_time = datetime.datetime.utcnow() + datetime.timedelta(seconds=delay_seconds)
            # Format as ISO 8601 string with Z timezone indicator
            scheduled_time_str = scheduled_time.isoformat(timespec='milliseconds') + 'Z'
            
            run_data = {
                "flow_id": flow_id,
                "parameters": parameters,
                "state": {
                    "type": "SCHEDULED",
                    "state_details": {
                        "scheduled_time": scheduled_time_str
                    }
                }
            }
            
            response = <http://requests.post|requests.post>(
                f"{self.prefect_api_url}/flow_runs/",
                json=run_data,
                headers=self.headers
            )
            
            if response.status_code in [200, 201]:
                flow_run = response.json()
                flow_run_id = flow_run["id"]
                logger.debug(f"Created delayed flow run with ID: {flow_run_id}, starting in {delay_seconds} seconds")
                return flow_run_id
            else:
                logger.warning(
                    f"Failed to create delayed flow run: {response.status_code}, {response.text}"
                )
                raise ValueError(f"Failed to create delayed flow run for deployment {deployment_id}")
        except Exception as e:
            logger.error(f"Error creating delayed flow run: {e}")
            raise