Vincent Chéry
07/08/2025, 3:34 PMMarvin
07/08/2025, 3:34 PMMarvin
07/08/2025, 3:36 PMpython
import inspect
from prefect import flow
from prefect.deployments import Deployment
import pytest
def validate_schedule_parameters(deployment: Deployment) -> None:
"""
Validates that all scheduled runs have parameters matching the flow's signature.
Args:
deployment: The deployment to validate
Raises:
ValueError: If any schedule parameters don't match the flow signature
"""
# Get the flow's signature
flow_params = inspect.signature(deployment.flow).parameters
# Check each schedule
for schedule in deployment.schedules:
if not hasattr(schedule, 'parameters') or not schedule.parameters:
continue
schedule_params = schedule.parameters
# Validate required parameters are present
for name, param in flow_params.items():
if param.default == inspect.Parameter.empty and name not in schedule_params:
raise ValueError(
f"Schedule is missing required parameter '{name}' "
f"for flow '{deployment.flow.__name__}'"
)
# Validate no extra parameters are present
for param_name in schedule_params:
if param_name not in flow_params:
raise ValueError(
f"Schedule contains unknown parameter '{param_name}' "
f"for flow '{deployment.flow.__name__}'"
)
# Example test implementation
def test_deployment_schedule_parameters():
@flow
def my_flow(required_param: str, optional_param: int = 42):
pass
# Create a deployment with a schedule
deployment = Deployment.build_from_flow(
flow=my_flow,
name="test-deployment",
schedules=[
{"schedule": "0 0 * * *", "parameters": {"required_param": "value", "optional_param": 100}}
]
)
# This should not raise any errors
validate_schedule_parameters(deployment)
# Test with invalid parameters
with pytest.raises(ValueError):
bad_deployment = Deployment.build_from_flow(
flow=my_flow,
name="bad-deployment",
schedules=[
{"schedule": "0 0 * * *", "parameters": {"unknown_param": "value"}}
]
)
validate_schedule_parameters(bad_deployment)
This solution provides several benefits:
1. Comprehensive Validation: It checks both for missing required parameters and unexpected extra parameters.
2. Early Detection: You can catch parameter mismatches during testing rather than at runtime.
3. Clear Error Messages: The validation provides specific error messages about which parameters are problematic.
To use this in your test suite:
1. Create a test file (e.g., test_deployments.py
)
2. Define your validation function
3. Write test cases for each of your deployments
4. Run as part of your regular test suite
You can extend this further by:
- Adding type checking for parameter values
- Validating parameter values against any constraints you've defined
- Adding custom validation rules specific to your use case
Remember that while Prefect does some parameter validation at runtime, having these tests ensures you catch issues early in your development cycle rather than when the scheduled runs actually execute.
This approach is particularly useful in CI/CD pipelines where you want to ensure that deployment configurations are valid before they're deployed to production.Vincent Chéry
07/08/2025, 3:36 PMMarvin
07/08/2025, 3:36 PMMarvin
07/08/2025, 3:37 PMpython
import inspect
from prefect import flow
from prefect.client import get_client
import pytest
import asyncio
async def validate_schedule_parameters(deployment_name: str) -> None:
"""
Validates that all scheduled runs have parameters matching the flow's signature.
Args:
deployment_name: The name of the deployment to validate (in format 'deployment/name')
Raises:
ValueError: If any schedule parameters don't match the flow signature
"""
async with get_client() as client:
# Get the deployment
deployment = await client.read_deployment(deployment_name)
# Get the flow function from your source
# You'll need to import your flow here
flow_function = your_flow_module.your_flow # Replace with actual import
# Get the flow's signature
flow_params = inspect.signature(flow_function).parameters
# Check each schedule
for schedule in deployment.schedules:
if not schedule.parameters:
continue
schedule_params = schedule.parameters
# Validate required parameters are present
for name, param in flow_params.items():
if param.default == inspect.Parameter.empty and name not in schedule_params:
raise ValueError(
f"Schedule is missing required parameter '{name}' "
f"for flow in deployment '{deployment_name}'"
)
# Validate no extra parameters are present
for param_name in schedule_params:
if param_name not in flow_params:
raise ValueError(
f"Schedule contains unknown parameter '{param_name}' "
f"for flow in deployment '{deployment_name}'"
)
# Example test implementation
@pytest.mark.asyncio
async def test_deployment_schedule_parameters():
@flow
def my_flow(required_param: str, optional_param: int = 42):
pass
# Create and deploy the flow
deployment = await my_flow.from_source(
source=".", # Or your actual source path/URL
name="test-deployment"
).deploy(
schedules=[
{"cron": "0 0 * * *", "parameters": {"required_param": "value", "optional_param": 100}}
]
)
# This should not raise any errors
await validate_schedule_parameters(f"{deployment.name}/{deployment.entrypoint}")
# Test with invalid parameters
with pytest.raises(ValueError):
bad_deployment = await my_flow.from_source(
source=".",
name="bad-deployment"
).deploy(
schedules=[
{"cron": "0 0 * * *", "parameters": {"unknown_param": "value"}}
]
)
await validate_schedule_parameters(f"{bad_deployment.name}/{bad_deployment.entrypoint}")
To use this in your project:
1. Create a test file (e.g., test_deployments.py
)
2. Modify the validation function to import your actual flows
3. Write test cases for each of your deployments
4. Run with pytest and the asyncio marker
Some important notes about this updated version:
1. It uses the Prefect client to read deployment information asynchronously
2. It works with the new deployment model in Prefect 3.x
3. You'll need to have your flows actually deployed to test them
4. The test requires the pytest-asyncio
package for async test support
You might also want to add these additional validations:
```python
from prefect.schemas.schedules import CronSchedule, IntervalSchedule
async def validate_deployment_schedule_types(deployment_name: str) -> None:
"""Additional validation for schedule types and configurations"""
async with get_client() as client:Marvin
07/08/2025, 3:37 PMenforce_parameter_schema=True
in your deployments to ensure runtime validation
2. Using Pydantic models for more robust parameter type checking
3. Adding validation for parameter types, not just parameter names
4. Adding custom validation rules specific to your use case
This solution provides a way to catch parameter mismatches during testing, before they cause issues in production scheduled runs.Vincent Chéry
07/08/2025, 3:39 PMMarvin
07/08/2025, 3:39 PMMarvin
07/08/2025, 3:39 PMpython
import inspect
from prefect import flow
import pytest
from typing import Any, Dict, Callable
def validate_schedule_parameters(
flow_function: Callable,
schedules: list[Dict[str, Any]]
) -> None:
"""
Validates that all scheduled runs have parameters matching the flow's signature.
Args:
flow_function: The flow function to validate against
schedules: List of schedule configurations with parameters
Raises:
ValueError: If any schedule parameters don't match the flow signature
"""
flow_params = inspect.signature(flow_function).parameters
for schedule in schedules:
schedule_params = schedule.get("parameters", {})
if not schedule_params:
continue
# Validate required parameters are present
for name, param in flow_params.items():
if param.default == inspect.Parameter.empty and name not in schedule_params:
raise ValueError(
f"Schedule is missing required parameter '{name}' "
f"for flow '{flow_function.__name__}'"
)
# Validate no extra parameters are present
for param_name in schedule_params:
if param_name not in flow_params:
raise ValueError(
f"Schedule contains unknown parameter '{param_name}' "
f"for flow '{flow_function.__name__}'"
)
# Example usage in tests
def test_deployment_schedule_parameters():
@flow
def my_flow(required_param: str, optional_param: int = 42):
pass
# Valid schedule configuration
valid_schedules = [
{
"cron": "0 0 * * *",
"parameters": {
"required_param": "value",
"optional_param": 100
}
}
]
# This should not raise any errors
validate_schedule_parameters(my_flow, valid_schedules)
# Invalid schedule configuration (missing required parameter)
invalid_schedules_missing_param = [
{
"cron": "0 0 * * *",
"parameters": {
"optional_param": 100 # missing required_param
}
}
]
with pytest.raises(ValueError) as exc_info:
validate_schedule_parameters(my_flow, invalid_schedules_missing_param)
assert "missing required parameter" in str(exc_info.value)
# Invalid schedule configuration (unknown parameter)
invalid_schedules_extra_param = [
{
"cron": "0 0 * * *",
"parameters": {
"required_param": "value",
"unknown_param": "value"
}
}
]
with pytest.raises(ValueError) as exc_info:
validate_schedule_parameters(my_flow, invalid_schedules_extra_param)
assert "unknown parameter" in str(exc_info.value)
# You can use this in your deployment creation code
def test_deployment_creation():
@flow
def my_flow(required_param: str, optional_param: int = 42):
pass
schedules = [
{
"cron": "0 0 * * *",
"parameters": {
"required_param": "value",
"optional_param": 100
}
}
]
# Validate before creating deployment
validate_schedule_parameters(my_flow, schedules)
# If validation passes, create the deployment
deployment = my_flow.from_source(
source=".",
name="test-deployment"
).deploy(
schedules=schedules
)
This solution offers several advantages:
1. No Client Dependencies: Works without needing to connect to a Prefect server
2. Early Validation: Catches issues before deployment creationMarvin
07/08/2025, 3:39 PMpython
from typing import Any, Type
def validate_parameter_types(
flow_function: Callable,
schedules: list[Dict[str, Any]]
) -> None:
"""Validate parameter types match the flow's type hints"""
hints = flow_function.__annotations__
for schedule in schedules:
schedule_params = schedule.get("parameters", {})
for param_name, value in schedule_params.items():
if param_name in hints:
expected_type = hints[param_name]
if not isinstance(value, expected_type):
raise TypeError(
f"Parameter '{param_name}' should be of type {expected_type.__name__}, "
f"got {type(value).__name__} instead"
)
def test_parameter_types():
@flow
def typed_flow(string_param: str, int_param: int = 42):
pass
schedules = [
{
"cron": "0 0 * * *",
"parameters": {
"string_param": 123, # Wrong type
"int_param": 42
}
}
]
with pytest.raises(TypeError):
validate_parameter_types(typed_flow, schedules)
You can use these validations in your CI/CD pipeline or as part of your development process:
python
def validate_deployment_config(flow_function: Callable, schedules: list[Dict[str, Any]]) -> None:
"""Complete validation of deployment configuration"""
validate_schedule_parameters(flow_function, schedules)
validate_parameter_types(flow_function, schedules)
# Add any other validation steps here
This approach gives you a robust way to catch parameter-related issues early in your development cycle, without needing to interact with a running Prefect server.Nate
07/08/2025, 3:45 PMVincent Chéry
07/09/2025, 7:12 AM