Amir Kamal
07/21/2025, 3:38 AMMarvin
07/21/2025, 3:38 AMMarvin
07/21/2025, 3:39 AMpython
from prefect.client import get_client
import yaml
import asyncio
async def cleanup_flows():
async with get_client() as client:
# Read flows from prefect.yaml
with open("prefect.yaml", "r") as f:
config = yaml.safe_load(f)
# Get configured flow names from prefect.yaml
configured_flows = set()
for deployment in config.get("deployments", []):
configured_flows.add(deployment.get("flow_name"))
# Get all flows from the API
flows = await client.read_flows()
# Delete flows that aren't in prefect.yaml
for flow in flows:
if flow.name not in configured_flows:
await client.delete_flow(flow.id)
# Run the cleanup
asyncio.run(cleanup_flows())
A few important notes:
1. Be cautious with automatic deletion scripts - make sure you have proper backups and testing in place.
2. Consider implementing a "dry run" mode that shows what would be deleted without actually deleting anything.
3. You might want to add additional safety checks, such as:
- Only deleting flows that haven't run in X days
- Only deleting flows matching certain naming patterns
- Excluding certain flows from automatic deletion
If you're interested in implementing this kind of solution, I can provide more detailed guidance on specific aspects you'd like to focus on.Amir Kamal
07/21/2025, 3:39 AMMarvin
07/21/2025, 3:39 AMMarvin
07/21/2025, 3:41 AMMarvin
07/21/2025, 3:41 AMThis script includes several important features:
1. *Dry Run Mode*: By default, it runs in "dry run" mode, which shows what would be deleted without actually deleting anything.
2. *Inactivity Check*: You can specify `inactive_days` to only delete flows that haven't been run in a certain period.
3. *Error Handling*: Comprehensive error handling and logging for both YAML parsing and API operations.
4. *Logging*: Detailed logging of all operations and decisions.
To use this script:
1. Save it as `cleanup_flows.py`
2. Run it in dry-run mode first:
```bash
python cleanup_flows.py
3. When you're confident it's working correctly, modify the dry_run
parameter to False
in the script to perform actual deletions.
You can customize the script further by:
1. Adding more filtering conditions:
python
async def should_delete_flow(self, flow, configured_flows: Set[str]) -> bool:
# Add custom conditions, for example:
if flow.name.startswith("test_"): # Only delete test flows
return True
return False
2. Adding tags or other metadata checks:
python
# Check flow tags
if "do_not_delete" in flow.tags:
return False
3. Adding backup functionality before deletion:
python
async def backup_flow(self, flow):
# Implement flow backup logic here
pass
Remember to:
- Always test the script in dry-run mode first
- Keep backups of your flow configurations
- Consider implementing a restoration process
- Add appropriate logging and monitoring
- Consider rate limiting if you have many flows to delete
Would you like me to elaborate on any particular aspect of this solution?