Gabor Hosszu
07/16/2025, 11:45 AM# Kick remote workflow for computer vision processing.
    flow_run = await run_deployment(
        name=f"{settings.PREFECT_COMPUTER_VISION_FLOW_NAME}/{settings.PREFECT_DEPLOYMENT_NAME}",
        parameters={
            "cos_item_link": request.cos_item_link,
            "contact_id": request.contact_id,
            "organization_id": organization_id,
            "ideal_customer_profile": ideal_customer_profile,
            "user_input": request.user_input,
        },
        timeout=0,
    )  # type: ignoreserveMarvin
07/16/2025, 11:45 AMMarvin
07/16/2025, 11:47 AM<http://127.0.0.1:4200>python
   from prefect import flow, task
   import logging
   # Set up logging at the flow level
   logger = logging.getLogger("prefect")
   logger.setLevel(logging.DEBUG)
   @flow
   def your_flow(**kwargs):
       logger.debug(f"Flow parameters: {kwargs}")
       # Your flow code herepython
   from prefect import flow
   import debugpy
   @flow
   def your_flow(**kwargs):
       # Add this at the start of your flow
       debugpy.listen(("0.0.0.0", 5678))
       debugpy.wait_for_client()  # Pause until debugger attaches
       
       # Your flow code herejson
   {
       "name": "Attach to Remote Flow",
       "type": "python",
       "request": "attach",
       "connect": {
           "host": "localhost",
           "port": 5678
       }
   }python
   # Instead of run_deployment, call the flow directly
   from your_flow_module import your_flow
   
   flow_run = await your_flow(
       cos_item_link=request.cos_item_link,
       contact_id=request.contact_id,
       organization_id=organization_id,
       ideal_customer_profile=ideal_customer_profile,
       user_input=request.user_input
   )python
   from prefect import flow, task
   from prefect.logging import get_run_logger
   @task
   async def process_data(data):
       logger = get_run_logger()
       <http://logger.info|logger.info>(f"Processing data: {data}")
       # Your processing code here
   @flow
   async def your_flow(**kwargs):
       logger = get_run_logger()
       <http://logger.info|logger.info>(f"Starting flow with parameters: {kwargs}")
       # Your flow code herepython
   @task
   async def validation_task(data):
       logger = get_run_logger()
       logger.debug(f"Validating data: {data}")
       assert data is not None, "Data cannot be None"
       return data
   @flow
   async def your_flow(**kwargs):
       data = await validation_task(kwargs)
       # Continue with processingserveGabor Hosszu
07/16/2025, 11:48 AMdebugpyMarvin
07/16/2025, 11:49 AMMarvin
07/16/2025, 11:50 AMbash
pip install debugpypython
from prefect import flow
import debugpy
@flow
async def computer_vision_flow(
    cos_item_link: str,
    contact_id: str,
    organization_id: str,
    ideal_customer_profile: dict,
    user_input: str
):
    # Set up debugpy at the start of your flow
    # Only enable in development/when needed
    if "ENABLE_DEBUGPY" in os.environ:
        debugpy.listen(("0.0.0.0", 5678))
        print("⏳ Waiting for debugger to attach...")
        debugpy.wait_for_client()
        print("🔍 Debugger attached!")
    # Your existing flow code here
    logger = get_run_logger()
    <http://logger.info|logger.info>("Starting computer vision processing...")
    # ... rest of your flow codejson
{
    "version": "0.2.0",
    "configurations": [
        {
            "name": "Attach to Prefect Flow",
            "type": "python",
            "request": "attach",
            "connect": {
                "host": "localhost",
                "port": 5678
            },
            "pathMappings": [
                {
                    "localRoot": "${workspaceFolder}",
                    "remoteRoot": "." // Adjust if your remote path differs
                }
            ],
            "justMyCode": true
        }
    ]
}bash
# Enable debugpy when starting your flow
export ENABLE_DEBUGPY=1
prefect worker start -p your-work-poolrun_deploymentdebugpy.wait_for_client()ENABLE_DEBUGPYpython
if "ENABLE_DEBUGPY" in os.environ:
    debugpy.listen(...)run_deploymentpython
flow_run = await run_deployment(
    name=f"{settings.PREFECT_COMPUTER_VISION_FLOW_NAME}/{settings.PREFECT_DEPLOYMENT_NAME}",
    parameters={...},
    timeout=0  # No timeout while debugging
)python
debugpy.listen(("0.0.0.0", 5678 + flow_instance_number))