gee
07/17/2025, 3:03 PMMarvin
07/17/2025, 3:03 PMMarvin
07/17/2025, 3:03 PMgee
07/17/2025, 3:05 PMMarvin
07/17/2025, 3:05 PMMarvin
07/17/2025, 3:06 PMpython
from prefect import flow
@flow(name="my_process")
def my_flow():
# flow logic here
pass
# Create two deployments
if __name__ == "__main__":
# Production deployment
my_flow.from_source(
source="<https://github.com/your/repo>",
entrypoint="flows/my_flow.py:my_flow"
).deploy(
name="my-process-prod",
tags=["production"]
)
# Test deployment (with potential modifications)
my_flow.from_source(
source="<https://github.com/your/repo>",
entrypoint="flows/my_flow.py:my_flow",
reference="experimental-branch" # or different branch/commit
).deploy(
name="my-process-test",
tags=["test"]
)
2. Control Traffic Split
You can control the distribution of work between versions using one of these approaches:
a. Using Flow Parameters:
python
@flow(name="router_flow")
def router_flow(data, test_percentage=20):
import random
if random.randint(1, 100) <= test_percentage:
# Call test version
my_flow_test.submit(data)
else:
# Call production version
my_flow_prod.submit(data)
b. Using Separate Schedules:
python
from datetime import timedelta
from prefect.schedules import IntervalSchedule
# Production gets majority of the schedule slots
prod_schedule = IntervalSchedule(interval=timedelta(hours=1))
# Test gets fewer slots
test_schedule = IntervalSchedule(interval=timedelta(hours=4))
3. Monitoring and Comparison
- Add metrics collection to both versions
- Use tags to easily filter and compare results
- Monitor both versions through the Prefect UI
4. Promotion Process
When ready to promote the test version:
a. If using the same code with different configurations:
- Update the production deployment with the new configuration
b. If using different code versions:
- Deploy the new code to the production deployment
- Archive or delete the test deployment
5. Rollback Plan
- Keep the original deployment configuration documented
- Use deployment versioning to quickly roll back if needed
Here's a more complete example incorporating monitoring:
python
from prefect import flow, task, metrics
@task
def collect_metrics(version: str, result):
metrics.gauge(
name="process_result",
value=result,
tags={"version": version}
)
@flow(name="my_process")
def my_flow(version: str = "prod"):
# Your flow logic here
result = perform_work()
# Collect metrics for comparison
collect_metrics(version, result)
return result
# Deploy both versions
if __name__ == "__main__":
my_flow.from_source(
source="<https://github.com/your/repo>",
entrypoint="flows/my_flow.py:my_flow"
).deploy(
name="my-process-prod",
parameters={"version": "prod"},
tags=["production"]
)
my_flow.from_source(
source="<https://github.com/your/repo>",
entrypoint="flows/my_flow.py:my_flow",
reference="experimental-branch"
).deploy(
name="my-process-test",
parameters={"version": "test"},
tags=["test"]
)
With a hint of pride in the elegant solution: This approach gives you the flexibility of true A/B testing while maintaining clear separation between versions and easy rollback capabilities. The use of metrics and tags makes it straightforward to compare performance and make data-driven decisions about which version to promote.Marvin
07/17/2025, 3:06 PM