Amir Kamal
07/08/2025, 3:29 AMMarvin
07/08/2025, 3:29 AMMarvin
07/08/2025, 3:30 AMWork Pool "data-processing" (limit: 10)
├── Queue "critical" (limit: 5)
├── Queue "normal" (limit: 3)
└── Queue "low-priority" (limit: 2)
Even though the queue limits sum to 10, if the critical queue is empty but the low-priority queue has 5 runs waiting, it will still only run 2 at a time - respecting its queue-level limit.
You can find more details about work pools and their configuration in the Prefect documentation.Amir Kamal
07/08/2025, 3:33 AMMarvin
07/08/2025, 3:33 AMMarvin
07/08/2025, 3:35 AMpython
from prefect import flow
from datetime import timedelta
# ETL Flows
@flow
def daily_sales_etl():
"""Heavy ETL job processing daily sales data"""
pass
@flow
def customer_data_sync():
"""Syncs customer data from CRM"""
pass
# Analytics Flows
@flow
def generate_executive_dashboard():
"""Creates executive dashboards"""
pass
@flow
def update_metrics():
"""Updates business metrics"""
pass
# Monitoring Flows
@flow
def health_check():
"""Quick system health check"""
pass
@flow
def data_quality_check():
"""Validates data quality"""
pass
Here's how you might organize these flows using work pools and queues:
python
# Deploy flows to different work pools and queues
daily_sales_etl.from_source(
source="your-repo",
entrypoint="flows.py:daily_sales_etl"
).deploy(
name="daily-sales-etl",
work_pool_name="etl-workload",
work_queue_name="heavy-processing",
)
health_check.from_source(
source="your-repo",
entrypoint="flows.py:health_check"
).deploy(
name="system-health-check",
work_pool_name="monitoring",
work_queue_name="critical",
)
Here's the organizational structure:
1. ETL Work Pool (Process-based, concurrency: 5)
Work Pool "etl-workload"
├── Queue "heavy-processing" (concurrency: 2)
│ └── daily_sales_etl
│ └── customer_data_sync
└── Queue "light-processing" (concurrency: 3)
└── smaller ETL jobs
2. Analytics Work Pool (Docker-based, concurrency: 3)
Work Pool "analytics"
├── Queue "high-priority" (concurrency: 2)
│ └── generate_executive_dashboard
└── Queue "standard" (concurrency: 1)
└── update_metrics
3. Monitoring Work Pool (Process-based, concurrency: 10)
Work Pool "monitoring"
├── Queue "critical" (concurrency: 5)
│ └── health_check
└── Queue "routine" (concurrency: 5)
└── data_quality_check
This setup provides several benefits:
1. Resource Isolation:
- Heavy ETL jobs can't overwhelm the system (limited to 2 concurrent runs)
- Monitoring flows have their own pool with higher concurrency for quick execution
- Analytics jobs run in Docker containers with specific resource limits
2. Priority Management:
- Critical health checks can run immediately in their high-priority queue
- Executive dashboards have priority over routine metric updates
- Heavy ETL jobs are contained to their own queue to prevent resource hogging
3. Infrastructure Flexibility:
- ETL workloads run on process infrastructure for maximum performance
- Analytics runs in Docker for better dependency isolation
- Monitoring runs on process infrastructure for minimal overhead
You can create these work pools through the Prefect UI or using the CLI:
bash
# Create work pools
prefect work-pool create "etl-workload" --type process
prefect work-pool create "analytics" --type docker
prefect work-pool create "monitoring" --type process
# Create queues (can also be done through UI)
prefect work-queue create "heavy-processing" --pool "etl-workload"
prefect work-queue create "critical" --pool "monitoring"
This structure ensures that:
- Critical monitoring jobs can always run (high concurrency, dedicated pool)
- Resource-intensive ETL jobs are properly throttled
- Different types of workloads don't interfere with each other
- Each type of work runs on appropriate infrastructure
The beauty of this setup is that you can adjust concurrency limits and queue priorities without redeploying your flows, making it easy to tune your system's performance as needs change.Amir Kamal
07/08/2025, 5:03 AMMarvin
07/08/2025, 5:03 AMMarvin
07/08/2025, 5:04 AMbash
# Start a worker for your work pool
prefect worker start -p "your-work-pool-name"
2. Verify worker status in the UI:
- Go to the "Workers" page
- Check if any workers are online and healthy
- Look for any error messages in the worker logs
3. Check work pool settings:
- Ensure the work pool type (Process, Docker, etc.) matches your environment
- Verify the work pool itself has a high enough concurrency limit
- The work pool concurrency must be >= queue concurrency
4. Monitor the worker logs for any errors:
bash
prefect worker start -p "your-work-pool-name" --log-level DEBUG
Common gotchas that might be causing this:
1. Missing Worker: Most common cause - no worker is running to pick up the flows
2. Infrastructure Mismatch: Work pool type doesn't match your environment (e.g., Docker pool but no Docker available)
3. Work Pool Concurrency: If the work pool's limit is lower than the queue's limit of 6
4. Network Issues: If using Prefect Cloud, check if the worker can connect to the API
To fix this:
1. Start a worker if none is running:
bash
prefect worker start -p "your-work-pool-name"
2. If you already have a worker running, check its logs for errors
3. Verify your concurrency settings:
bash
# Check work pool settings
prefect work-pool inspect "your-work-pool-name"
# Check queue settings
prefect work-queue inspect "your-queue-name"
Once you have an active worker running and properly configured, your flows should start moving from "Late" to "Running" status. If they don't, check the worker logs for any specific error messages that might indicate what's blocking execution.
Remember: Setting concurrency limits is like setting tables at a restaurant - you also need staff (workers) to actually serve the customers (run the flows).