<@ULVA73B9P> How can I add an automated test that ...
# ask-marvin
v
@Marvin How can I add an automated test that tests whether my deployments' schedules have associated parameters that match with the signature of the flow ?
m
thought for 70.5 seconds
Based on this, I'll help you create a testing approach. Here's a practical solution that you can implement to test your deployment schedules' parameters against flow signatures:
Copy code
python
import inspect
from prefect import flow
from prefect.deployments import Deployment
import pytest

def validate_schedule_parameters(deployment: Deployment) -> None:
    """
    Validates that all scheduled runs have parameters matching the flow's signature.
    
    Args:
        deployment: The deployment to validate
    
    Raises:
        ValueError: If any schedule parameters don't match the flow signature
    """
    # Get the flow's signature
    flow_params = inspect.signature(deployment.flow).parameters
    
    # Check each schedule
    for schedule in deployment.schedules:
        if not hasattr(schedule, 'parameters') or not schedule.parameters:
            continue
            
        schedule_params = schedule.parameters
        
        # Validate required parameters are present
        for name, param in flow_params.items():
            if param.default == inspect.Parameter.empty and name not in schedule_params:
                raise ValueError(
                    f"Schedule is missing required parameter '{name}' "
                    f"for flow '{deployment.flow.__name__}'"
                )
        
        # Validate no extra parameters are present
        for param_name in schedule_params:
            if param_name not in flow_params:
                raise ValueError(
                    f"Schedule contains unknown parameter '{param_name}' "
                    f"for flow '{deployment.flow.__name__}'"
                )

# Example test implementation
def test_deployment_schedule_parameters():
    @flow
    def my_flow(required_param: str, optional_param: int = 42):
        pass
    
    # Create a deployment with a schedule
    deployment = Deployment.build_from_flow(
        flow=my_flow,
        name="test-deployment",
        schedules=[
            {"schedule": "0 0 * * *", "parameters": {"required_param": "value", "optional_param": 100}}
        ]
    )
    
    # This should not raise any errors
    validate_schedule_parameters(deployment)
    
    # Test with invalid parameters
    with pytest.raises(ValueError):
        bad_deployment = Deployment.build_from_flow(
            flow=my_flow,
            name="bad-deployment",
            schedules=[
                {"schedule": "0 0 * * *", "parameters": {"unknown_param": "value"}}
            ]
        )
        validate_schedule_parameters(bad_deployment)
This solution provides several benefits: 1. Comprehensive Validation: It checks both for missing required parameters and unexpected extra parameters. 2. Early Detection: You can catch parameter mismatches during testing rather than at runtime. 3. Clear Error Messages: The validation provides specific error messages about which parameters are problematic. To use this in your test suite: 1. Create a test file (e.g.,
test_deployments.py
) 2. Define your validation function 3. Write test cases for each of your deployments 4. Run as part of your regular test suite You can extend this further by: - Adding type checking for parameter values - Validating parameter values against any constraints you've defined - Adding custom validation rules specific to your use case Remember that while Prefect does some parameter validation at runtime, having these tests ensures you catch issues early in your development cycle rather than when the scheduled runs actually execute. This approach is particularly useful in CI/CD pipelines where you want to ensure that deployment configurations are valid before they're deployed to production.
v
@Marvin The deployment object has no flow attribute
m
thought for 51.6 seconds
Here's an updated solution that works with Prefect 3.x:
Copy code
python
import inspect
from prefect import flow
from prefect.client import get_client
import pytest
import asyncio

async def validate_schedule_parameters(deployment_name: str) -> None:
    """
    Validates that all scheduled runs have parameters matching the flow's signature.
    
    Args:
        deployment_name: The name of the deployment to validate (in format 'deployment/name')
    
    Raises:
        ValueError: If any schedule parameters don't match the flow signature
    """
    async with get_client() as client:
        # Get the deployment
        deployment = await client.read_deployment(deployment_name)
        
        # Get the flow function from your source
        # You'll need to import your flow here
        flow_function = your_flow_module.your_flow  # Replace with actual import
        
        # Get the flow's signature
        flow_params = inspect.signature(flow_function).parameters
        
        # Check each schedule
        for schedule in deployment.schedules:
            if not schedule.parameters:
                continue
                
            schedule_params = schedule.parameters
            
            # Validate required parameters are present
            for name, param in flow_params.items():
                if param.default == inspect.Parameter.empty and name not in schedule_params:
                    raise ValueError(
                        f"Schedule is missing required parameter '{name}' "
                        f"for flow in deployment '{deployment_name}'"
                    )
            
            # Validate no extra parameters are present
            for param_name in schedule_params:
                if param_name not in flow_params:
                    raise ValueError(
                        f"Schedule contains unknown parameter '{param_name}' "
                        f"for flow in deployment '{deployment_name}'"
                    )

# Example test implementation
@pytest.mark.asyncio
async def test_deployment_schedule_parameters():
    @flow
    def my_flow(required_param: str, optional_param: int = 42):
        pass
    
    # Create and deploy the flow
    deployment = await my_flow.from_source(
        source=".",  # Or your actual source path/URL
        name="test-deployment"
    ).deploy(
        schedules=[
            {"cron": "0 0 * * *", "parameters": {"required_param": "value", "optional_param": 100}}
        ]
    )
    
    # This should not raise any errors
    await validate_schedule_parameters(f"{deployment.name}/{deployment.entrypoint}")
    
    # Test with invalid parameters
    with pytest.raises(ValueError):
        bad_deployment = await my_flow.from_source(
            source=".",
            name="bad-deployment"
        ).deploy(
            schedules=[
                {"cron": "0 0 * * *", "parameters": {"unknown_param": "value"}}
            ]
        )
        await validate_schedule_parameters(f"{bad_deployment.name}/{bad_deployment.entrypoint}")
To use this in your project: 1. Create a test file (e.g.,
test_deployments.py
) 2. Modify the validation function to import your actual flows 3. Write test cases for each of your deployments 4. Run with pytest and the asyncio marker Some important notes about this updated version: 1. It uses the Prefect client to read deployment information asynchronously 2. It works with the new deployment model in Prefect 3.x 3. You'll need to have your flows actually deployed to test them 4. The test requires the
pytest-asyncio
package for async test support You might also want to add these additional validations: ```python from prefect.schemas.schedules import CronSchedule, IntervalSchedule async def validate_deployment_schedule_types(deployment_name: str) -> None: """Additional validation for schedule types and configurations""" async with get_client() as client:
deployment = await client.read_deployment(deployment_name) for schedule in deployment.schedules: if not isinstance(schedule, (CronSchedule, IntervalSchedule)): raise ValueError(f"Unsupported schedule type: {type(schedule)}") # Add any additional schedule-specific validations here ``` Remember to also consider: 1. Setting
enforce_parameter_schema=True
in your deployments to ensure runtime validation 2. Using Pydantic models for more robust parameter type checking 3. Adding validation for parameter types, not just parameter names 4. Adding custom validation rules specific to your use case This solution provides a way to catch parameter mismatches during testing, before they cause issues in production scheduled runs.
v
@Marvin Is there a solution without calling the client ?
m
thought for 20.6 seconds
Yes, absolutely! We can create a simpler, more direct solution by testing the flow and its schedule parameters at the time of deployment creation. Here's a cleaner approach:
Copy code
python
import inspect
from prefect import flow
import pytest
from typing import Any, Dict, Callable

def validate_schedule_parameters(
    flow_function: Callable,
    schedules: list[Dict[str, Any]]
) -> None:
    """
    Validates that all scheduled runs have parameters matching the flow's signature.
    
    Args:
        flow_function: The flow function to validate against
        schedules: List of schedule configurations with parameters
    
    Raises:
        ValueError: If any schedule parameters don't match the flow signature
    """
    flow_params = inspect.signature(flow_function).parameters
    
    for schedule in schedules:
        schedule_params = schedule.get("parameters", {})
        if not schedule_params:
            continue
            
        # Validate required parameters are present
        for name, param in flow_params.items():
            if param.default == inspect.Parameter.empty and name not in schedule_params:
                raise ValueError(
                    f"Schedule is missing required parameter '{name}' "
                    f"for flow '{flow_function.__name__}'"
                )
        
        # Validate no extra parameters are present
        for param_name in schedule_params:
            if param_name not in flow_params:
                raise ValueError(
                    f"Schedule contains unknown parameter '{param_name}' "
                    f"for flow '{flow_function.__name__}'"
                )

# Example usage in tests
def test_deployment_schedule_parameters():
    @flow
    def my_flow(required_param: str, optional_param: int = 42):
        pass
    
    # Valid schedule configuration
    valid_schedules = [
        {
            "cron": "0 0 * * *",
            "parameters": {
                "required_param": "value",
                "optional_param": 100
            }
        }
    ]
    
    # This should not raise any errors
    validate_schedule_parameters(my_flow, valid_schedules)
    
    # Invalid schedule configuration (missing required parameter)
    invalid_schedules_missing_param = [
        {
            "cron": "0 0 * * *",
            "parameters": {
                "optional_param": 100  # missing required_param
            }
        }
    ]
    
    with pytest.raises(ValueError) as exc_info:
        validate_schedule_parameters(my_flow, invalid_schedules_missing_param)
    assert "missing required parameter" in str(exc_info.value)
    
    # Invalid schedule configuration (unknown parameter)
    invalid_schedules_extra_param = [
        {
            "cron": "0 0 * * *",
            "parameters": {
                "required_param": "value",
                "unknown_param": "value"
            }
        }
    ]
    
    with pytest.raises(ValueError) as exc_info:
        validate_schedule_parameters(my_flow, invalid_schedules_extra_param)
    assert "unknown parameter" in str(exc_info.value)

# You can use this in your deployment creation code
def test_deployment_creation():
    @flow
    def my_flow(required_param: str, optional_param: int = 42):
        pass
    
    schedules = [
        {
            "cron": "0 0 * * *",
            "parameters": {
                "required_param": "value",
                "optional_param": 100
            }
        }
    ]
    
    # Validate before creating deployment
    validate_schedule_parameters(my_flow, schedules)
    
    # If validation passes, create the deployment
    deployment = my_flow.from_source(
        source=".",
        name="test-deployment"
    ).deploy(
        schedules=schedules
    )
This solution offers several advantages: 1. No Client Dependencies: Works without needing to connect to a Prefect server 2. Early Validation: Catches issues before deployment creation
3. Simple Integration: Easy to add to existing test suites 4. Clear Error Messages: Provides specific feedback about parameter mismatches You can extend this further with additional validations:
Copy code
python
from typing import Any, Type

def validate_parameter_types(
    flow_function: Callable,
    schedules: list[Dict[str, Any]]
) -> None:
    """Validate parameter types match the flow's type hints"""
    hints = flow_function.__annotations__
    
    for schedule in schedules:
        schedule_params = schedule.get("parameters", {})
        for param_name, value in schedule_params.items():
            if param_name in hints:
                expected_type = hints[param_name]
                if not isinstance(value, expected_type):
                    raise TypeError(
                        f"Parameter '{param_name}' should be of type {expected_type.__name__}, "
                        f"got {type(value).__name__} instead"
                    )

def test_parameter_types():
    @flow
    def typed_flow(string_param: str, int_param: int = 42):
        pass
    
    schedules = [
        {
            "cron": "0 0 * * *",
            "parameters": {
                "string_param": 123,  # Wrong type
                "int_param": 42
            }
        }
    ]
    
    with pytest.raises(TypeError):
        validate_parameter_types(typed_flow, schedules)
You can use these validations in your CI/CD pipeline or as part of your development process:
Copy code
python
def validate_deployment_config(flow_function: Callable, schedules: list[Dict[str, Any]]) -> None:
    """Complete validation of deployment configuration"""
    validate_schedule_parameters(flow_function, schedules)
    validate_parameter_types(flow_function, schedules)
    # Add any other validation steps here
This approach gives you a robust way to catch parameter-related issues early in your development cycle, without needing to interact with a running Prefect server.
n
hi @Vincent Chéry
v
Thanks @Nate!