Cory Hans
07/25/2025, 3:31 PMMarvin
07/25/2025, 3:32 PMMarvin
07/25/2025, 3:33 PMprefect worker start with DEBUG logging enabled
- Regularly checking the Prefect UI for worker status
- Setting up alerts for when workers enter NOT_READY state
5. Additional Recommendations:
- Consider implementing a worker health check script that can automatically restart workers if they enter a NOT_READY state
- Document the specific conditions that led to this failure for future reference
- If using a process worker, consider switching to a more robust worker type like Docker (if available in your environment)
Would you like me to provide more specific details about any of these areas? Also, it would be helpful to know:
1. What type of worker are you using (process, docker, etc.)?
2. Are you seeing any specific error messages in the logs?
3. What is your deployment schedule frequency?
This information would help me provide more targeted recommendations for your specific situation.Cory Hans
07/25/2025, 3:36 PMCory Hans
07/25/2025, 3:37 PMMarvin
07/25/2025, 3:37 PMMarvin
07/25/2025, 3:39 PMpython
import subprocess
import time
import requests
import logging
from prefect.client import get_client
from prefect.settings import PREFECT_API_URL
import sys
import os
# Setup logging
logging.basicConfig(
level=<http://logging.INFO|logging.INFO>,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('worker_health.log'),
logging.StreamHandler(sys.stdout)
]
)
class WorkerHealthMonitor:
def __init__(self, work_pool_name, worker_name=None, check_interval=60):
self.work_pool_name = work_pool_name
self.worker_name = worker_name
self.check_interval = check_interval
self.process = None
async def get_worker_status(self):
"""Check worker status using Prefect API"""
try:
async with get_client() as client:
workers = await client.read_workers()
for worker in workers:
if (self.worker_name and worker.name == self.worker_name) or \
(not self.worker_name and worker.work_pool_name == self.work_pool_name):
return worker.status
except Exception as e:
logging.error(f"Error checking worker status: {e}")
return None
def start_worker(self):
"""Start the Prefect worker"""
try:
cmd = ["prefect", "worker", "start", "-p", self.work_pool_name]
if self.worker_name:
cmd.extend(["-n", self.worker_name])
# Start worker process
self.process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
<http://logging.info|logging.info>(f"Started worker for pool {self.work_pool_name}")
except Exception as e:
logging.error(f"Error starting worker: {e}")
def stop_worker(self):
"""Stop the worker process"""
if self.process:
self.process.terminate()
try:
self.process.wait(timeout=30)
except subprocess.TimeoutExpired:
self.process.kill()
<http://logging.info|logging.info>("Stopped worker process")
async def monitor(self):
"""Main monitoring loop"""
while True:
try:
status = await self.get_worker_status()
if status != "ONLINE":
logging.warning(f"Worker status is {status}. Attempting restart...")
self.stop_worker()
time.sleep(5) # Wait for cleanup
self.start_worker()
else:
<http://logging.info|logging.info>("Worker status: ONLINE")
except Exception as e:
logging.error(f"Error in monitoring loop: {e}")
time.sleep(self.check_interval)
# Example usage
if __name__ == "__main__":
import asyncio
# Configure these values
WORK_POOL_NAME = "your-pool-name"
WORKER_NAME = "your-worker-name" # Optional
CHECK_INTERVAL = 60 # seconds
monitor = WorkerHealthMonitor(
work_pool_name=WORK_POOL_NAME,
worker_name=WORKER_NAME,
check_interval=CHECK_INTERVAL
)
try:
asyncio.run(monitor.monitor())
except KeyboardInterrupt:
<http://logging.info|logging.info>("Shutting down monitor...")
monitor.stop_worker()
To use this script:
1. Save it as worker_monitor.py
2. Install required dependencies (you should already have them with Prefect)
3. Update the configuration values at the bottom of the script
4. Run it:
bash
python worker_monitor.py
The script will:
- Monitor your worker's status every 60 seconds (configurable)Marvin
07/25/2025, 3:39 PMworker_health.log file
- Automatically restart the worker if it enters a NOT_READY state
- Handle graceful shutdowns
Some important notes:
1. This is a basic example - you might want to add more error handling or notifications
2. Consider running this script as a Windows Service for production use
3. You might want to add additional metrics like memory usage, CPU usage, etc.
4. The script uses async/await for efficient API communication
Would you like me to explain any part of the script in more detail or help you set it up as a Windows Service? Also, given that you mentioned deployments with various schedules, we might want to add monitoring for deployment latency as well - would that be useful?Cory Hans
07/25/2025, 3:40 PMMarvin
07/25/2025, 3:40 PMMarvin
07/25/2025, 3:41 PMbash
prefect worker start -p "your-pool"
- Pros: Simple, lightweight
- Cons: Limited isolation, as you're experiencing
b) Docker Worker:
bash
prefect worker start -p "your-pool" --type docker
- Pros: Each flow runs in its own container
- Cons: Requires Docker setup
c) Kubernetes Worker:
bash
prefect worker start -p "your-pool" --type kubernetes
- Pros: Full isolation, scalability
- Cons: More complex setup
4. Immediate Improvements:
If you need to stick with the process worker, you can:
a) Set concurrency limits to prevent resource exhaustion:
python
from prefect.concurrency import call_with_concurrency_limit
@flow
async def your_flow():
async with call_with_concurrency_limit("my-deployment", limit=2):
# your flow code here
b) Implement proper cleanup in your flows:
python
@flow
def your_flow():
try:
# your flow code
finally:
# cleanup code
gc.collect() # force garbage collection if needed
c) Set worker-level concurrency limits:
bash
prefect worker start -p "your-pool" --limit 3
5. Best Practices Moving Forward:
1. Use Docker or Kubernetes workers if possible
2. Implement proper error handling in your flows
3. Set appropriate timeouts
4. Use work queues to segregate different types of workloads
5. Monitor system resources (memory, CPU) actively
The fact that one failed deployment is affecting others suggests that the process worker might not be the best choice for your use case. Would you be interested in exploring the Docker worker setup? It would provide better isolation while still being relatively simple to set up, and would prevent these kinds of cascade failures.
Also, it would be helpful to know:
1. What kind of resources do your flows typically use?
2. Are there any shared resources between deployments?
3. What's the typical memory footprint of your flows?
This information would help determine the best path forward for your specific use case.Cory Hans
07/25/2025, 5:16 PMMarvin
07/25/2025, 5:16 PMMarvin
07/25/2025, 5:17 PMbash
prefect worker start -p "your-pool" --log-level DEBUG
2. Monitor both workers during the transition using the Prefect UI
3. Consider using the --limit flag to control concurrency:
bash
prefect worker start -p "your-pool" --limit 3
This way you can ensure your system doesn't get overwhelmed during the transition.
Would you like me to provide more details about any of these aspects or help you set up monitoring for the worker transition?Cory Hans
07/25/2025, 5:21 PMMarvin
07/25/2025, 5:21 PMMarvin
07/25/2025, 5:22 PMpython
from prefect import flow
from prefect.concurrency import call_with_concurrency_limit
@flow
async def your_flow():
async with call_with_concurrency_limit("deployment-a", limit=1):
# your flow code here
2. Using Deployment Configuration:
python
from prefect.deployments import Deployment
from prefect.server.schemas.schedules import CronSchedule
deployment = Deployment.build_from_flow(
flow=your_flow,
name="deployment-a",
parameters={"param": "value"},
schedule=CronSchedule(cron="0 0 * * *"),
work_queue_name="default",
work_pool_name="your-pool",
enforce_parameter_schema=True,
tags=["deployment-a"],
skip_if_running=True # This is key for your use case
)
3. Using Work Queue Settings:
python
from prefect.client import get_client
async with get_client() as client:
await client.create_work_queue(
name="deployment-a-queue",
work_pool_name="your-pool",
concurrency_limit=1
)
For your specific scenario of wanting only one late run to execute, here are the recommended approaches:
1. Best Option: Use skip_if_running with Concurrency Limits:
python
from prefect import flow
from prefect.deployments import Deployment
@flow(retries=0)
def your_flow():
# your flow code here
deployment = flow.from_source(
source="your-source",
name="deployment-a",
skip_if_running=True, # Skips new runs if one is already running
tags=["deployment-a"]
).deploy(
work_pool_name="your-pool",
work_queue_name="deployment-a-queue"
)
2. Create a Concurrency Limit for the Deployment:
bash
prefect concurrency-limit create deployment-a 1
Then in your flow:
python
@flow
async def your_flow():
async with call_with_concurrency_limit("deployment-a", limit=1):
# your flow code here
3. Configure Late Run Behavior:
You can set the environment variable PREFECT_API_SERVICES_LATE_RUNS_AFTER_SECONDS to control when runs are considered late, and combine this with concurrency limits to manage execution.
4. Work Queue Concurrency:
Create a dedicated work queue for the deployment with a concurrency limit:
python
from prefect.client import get_client
async def setup_queue():
async with get_client() as client:
await client.create_work_queue(
name="deployment-a-queue",
work_pool_name="your-pool",
concurrency_limit=1
)
The most robust solution would be combining these approaches:
python
from prefect import flow
from prefect.concurrency import call_with_concurrency_limit
@flow(retries=0)
async def your_flow():
async with call_with_concurrency_limit("deployment-a", limit=1):
# your flow code here
deployment = flow.from_source(
source="your-source",
name="deployment-a",
skip_if_running=True,
).deploy(
work_pool_name="your-pool",
work_queue_name="deployment-a-queue",
tags=["deployment-a"]
)
This setup will:
1. Only allow one instance of the flow to run at a time
2. Skip new runs if one is already running
3. Use a dedicated work queue that can have its own concurrency limits
4. Allow you to track and manage the deployment specifically through its tag
Would you like me to explain any of these approaches in more detail or help you implement one of these solutions?Cory Hans
07/25/2025, 6:25 PMMarvin
07/25/2025, 6:25 PMMarvin
07/25/2025, 6:27 PMpython
from prefect import flow
from prefect.concurrency import call_with_concurrency_limit
@flow
async def your_flow():
async with call_with_concurrency_limit("deployment-a", limit=1):
# your flow code here
deployment = flow.from_source(
source="your-source",
name="deployment-a"
).deploy(
work_pool_name="your-pool",
work_queue_name="deployment-a-queue",
tags=["deployment-a"]
)
2. Using Work Queue Concurrency:
python
from prefect.client import get_client
async def setup_queue():
async with get_client() as client:
await client.create_work_queue(
name="deployment-a-queue",
work_pool_name="your-pool",
concurrency_limit=1
)
I apologize for the confusion earlier. Would you like me to provide more details about either of these approaches for managing concurrent runs?Cory Hans
07/25/2025, 6:28 PMMarvin
07/25/2025, 6:28 PMMarvin
07/25/2025, 6:29 PMpython
from prefect.client import get_client
async def setup_deployment_concurrency():
async with get_client() as client:
# Create a dedicated work queue with concurrency limit
await client.create_work_queue(
name="deployment-a-queue",
work_pool_name="your-pool",
concurrency_limit=1 # Only one run at a time
)
# Then use this queue in your deployment
deployment = flow.from_source(
source="your-source",
name="deployment-a"
).deploy(
work_pool_name="your-pool",
work_queue_name="deployment-a-queue" # Use the limited queue
)
2. Global Concurrency Limits (Alternative Approach):
bash
# Create a global concurrency limit for your deployment
prefect gcl create \
--name "deployment-a-limit" \
--limit 1 \
--slot-name "deployment-a"
Then in your flow:
python
from prefect import flow
from prefect.concurrency import call_with_concurrency_limit
@flow
async def your_flow():
async with call_with_concurrency_limit("deployment-a-limit", limit=1):
# your flow code here
The key differences:
1. Work Queue Concurrency:
- Controls concurrency at the queue level
- Prevents multiple runs from being picked up by workers
- Manages concurrency before flow execution
- Best for your use case of managing late runs
2. Global Concurrency Limits:
- Controls concurrency at the flow execution level
- Runs are still picked up but wait for execution
- Manages concurrency during flow execution
- Better for resource-based limitations
For your specific case of wanting to control late runs, the Work Queue Concurrency approach would be more appropriate because:
- It prevents multiple runs from being picked up in the first place
- It's managed at the infrastructure level rather than the flow level
- It's more efficient as it doesn't require runs to be picked up before the concurrency is managed
Would you like me to provide more details about setting up and managing work queue concurrency limits?Cory Hans
07/25/2025, 6:30 PMMarvin
07/25/2025, 6:30 PMMarvin
07/25/2025, 6:32 PMCory Hans
07/25/2025, 6:41 PMMarvin
07/25/2025, 6:41 PMMarvin
07/25/2025, 6:42 PMbash
export PREFECT_API_SERVICES_LATE_RUNS_AFTER_SECONDS=3600 # 1 hour
2. Auto-cancel Late Runs Using a Flow Configuration:
python
from prefect import flow
from datetime import timedelta
@flow(timeout_seconds=3600) # Will cancel after 1 hour
def your_flow():
# your flow code here
3. Create a Custom Late Run Handler:
python
from prefect import flow
from prefect.client import get_client
from datetime import datetime, timedelta
async def cancel_late_runs(deployment_name: str, hours_late: int = 2):
async with get_client() as client:
# Get late runs for the deployment
flows = await client.read_flows()
late_threshold = datetime.utcnow() - timedelta(hours=hours_late)
for flow in flows:
if (flow.expected_start_time
and flow.expected_start_time < late_threshold
and flow.state.is_scheduled()):
# Cancel the flow run
await client.set_flow_run_state(
flow.id,
state="CANCELLED",
message=f"Cancelled automatically - more than {hours_late} hours late"
)
# Use in your deployment setup
await cancel_late_runs("your-deployment-name", hours_late=2)
4. Using Work Queue Settings (Recommended Approach):
python
from prefect.client import get_client
async def setup_queue_with_late_policy():
async with get_client() as client:
await client.create_work_queue(
name="your-queue",
work_pool_name="your-pool",
max_late_runs=0 # Don't accept any late runs
)
# Then use this queue in your deployment
deployment = flow.from_source(
source="your-source",
name="deployment-name"
).deploy(
work_pool_name="your-pool",
work_queue_name="your-queue"
)
The Work Queue approach is probably the most robust because:
1. It handles the late runs at the infrastructure level
2. It's more efficient (doesn't require additional monitoring)
3. It's more reliable (doesn't depend on custom code running)
Would you like me to provide more details about any of these approaches? Also, what's your specific use case - are you trying to prevent late runs entirely, or just cancel them after a certain period?Cory Hans
07/25/2025, 6:43 PMMarvin
07/25/2025, 6:43 PMMarvin
07/25/2025, 6:45 PMtimeout_seconds - it only applies to running flows, not to late or unstarted flows. It starts counting only after the flow begins execution.
For handling late runs specifically, you would need to use:
1. The PREFECT_API_SERVICES_LATE_RUNS_AFTER_SECONDS setting to define when runs are considered late
2. A custom solution or work queue configuration to handle what happens to late runs
Would you like me to research alternative approaches specifically for handling late runs? For example, we could look into:
1. Work queue configurations that might help
2. Custom solutions using the Prefect API
3. Other available settings that might be more relevant to late run managementCory Hans
07/29/2025, 6:04 PMMarvin
07/29/2025, 6:04 PMMarvin
07/29/2025, 6:04 PMstart "" /B syntax won't work as intended. The prefect worker start command is designed to work with one work pool at a time, as indicated by the single --pool option being required.
If you need to start multiple workers for different pools, you'll need to run separate commands. Here are a few approaches you could take:
1. Run separate commands in different terminals:
bash
prefect worker start -p "pool1"
prefect worker start -p "pool2"
2. Use Windows-specific batch scripting to start multiple workers:
batch
start "" /B prefect worker start -p "pool1"
start "" /B prefect worker start -p "pool2"
3. Create a simple batch script (e.g., start-workers.bat):
batch
@echo off
start "" /B prefect worker start -p "pool1"
start "" /B prefect worker start -p "pool2"
Each worker needs its own process because it maintains its own connection to the work pool and manages its own set of running flows. This is by design to ensure proper isolation and resource management.
Let me know if you'd like me to explain more about work pool management or if you need help with a specific implementation approach!Cory Hans
07/30/2025, 5:38 PMMarvin
07/30/2025, 5:38 PMMarvin
07/30/2025, 5:39 PMbash
prefect deployment pause "flow-name/deployment-name"
b. Delete any existing scheduled runs for the deployment that are still pointing to the old work pool:
- You can do this through the UI by going to the deployment's runs page and deleting the scheduled runs
- Or use the Prefect API to delete the scheduled runs
c. Update the deployment with the new work pool assignment:
python
flow.from_source(...).deploy(
name="your-deployment-name",
work_pool_name="new-pool-name"
)
d. Resume the deployment:
bash
prefect deployment resume "flow-name/deployment-name"
3. Verification:
- After following these steps, check the deployment's scheduled runs
- All new scheduled runs should only be assigned to the new work pool
- You can verify this in the UI under the deployment's "Runs" tab
4. Prevention:
- When updating work pool assignments, it's best to follow this pause-clean-update-resume pattern
- This ensures a clean transition and prevents any runs from being scheduled to the old work pool
Let me know if you need help with any of these steps or if you're still seeing runs scheduled to the old work pool after following this process!Cory Hans
08/01/2025, 9:33 PMMarvin
08/01/2025, 9:33 PMMarvin
08/01/2025, 9:34 PMstart_time and end_time can cause overlapping schedules
- Solution:
- Double-check schedule parameters
- Ensure schedules don't have overlapping time windows
- Use the schedule preview feature to verify expected run times
3. Concurrency and Timing Issues
- Cause: Race conditions can occur when a flow run is still executing while its next scheduled instance is triggered
- Solution:
- Set appropriate concurrency limits on your deployments
- Ensure flow run duration doesn't exceed the schedule interval
- Consider using flow-level concurrency controls
4. Infrastructure/Agent Issues
- Cause: Sometimes infrastructure or agent issues can lead to duplicate submissions
- Solution:
- Keep Prefect and its dependencies up to date
- Monitor agent logs for unusual behavior
- Consider using unique run keys for critical workflows
5. Server Scheduling Edge Cases
- Cause: In some cases, the scheduler might emit multiple flow-run creation events for the same scheduled timestamp
- Solution:
- Implement deduplication logic if necessary
- Consider using custom run keys
- Monitor scheduled runs and their actual execution times
Best Practices to Prevent Duplicates:
1. Use unique run keys for critical workflows where exactly-once execution is required
2. Implement appropriate concurrency controls
3. Allow adequate buffer time between schedule intervals
4. Regularly monitor your deployment schedules and runs
5. Keep your Prefect installation updated to the latest version
Note: If you're experiencing this issue, it would be helpful to know:
- Your specific schedule configuration
- Your Prefect version
- Whether the duplicates occur at regular intervals or randomly
- The timing between duplicate runs
This would allow me to provide more targeted solutions for your specific case.Cory Hans
08/01/2025, 9:35 PMMarvin
08/01/2025, 9:35 PMMarvin
08/01/2025, 9:36 PMprefect gcl
- API
- UI
- Terraform
2. Work Pool Concurrency
- Set at the work pool level
- Controls how many flow runs can execute simultaneously within a work pool
3. Work Queue Concurrency
- Controls concurrent flow runs within a specific work queue
The flow-level concurrency that was available in Prefect 2.x is indeed no longer available in 3.x. Instead, you'll need to use a combination of the above methods to achieve similar control over concurrent executions.
For Handling Duplicate Runs:
Without flow-level concurrency, here are the recommended approaches:
1. Use Global Concurrency Limits (GCLs) with specific tags or names
2. Configure work queue concurrency appropriately
3. Implement custom deduplication logic in your flows if needed
4. Use unique run keys for critical workflows where exactly-once execution is required
If you need exact flow-level concurrency control similar to what was available in 2.x, you might need to implement this at the application level or use a combination of GCLs and tags to achieve similar results.
Would you like me to provide more specific details about implementing any of these approaches?Cory Hans
08/01/2025, 9:44 PMMarvin
08/01/2025, 9:44 PMMarvin
08/01/2025, 9:45 PMdeployment_schedule table. Here's how you can inspect it:
1. Direct Database Inspection
The default location for the Prefect database is:
- Linux/Mac: ~/.prefect/prefect.db
- Windows: C:\Users\<USERNAME>\.prefect\prefect.db
You can inspect it using SQLite tools:
bash
sqlite3 ~/.prefect/prefect.db
Then run:
sql
SELECT * FROM deployment_schedule WHERE deployment_id = '<your-deployment-id>';
2. Using Prefect API
You can also check schedules programmatically:
python
from prefect.client import get_client
async def check_schedules(deployment_id):
client = get_client()
schedules = await client.read_deployment_schedules(deployment_id=deployment_id)
print(schedules)
3. CLI Check
bash
prefect deployment inspect <deployment-name> --field schedules
If you're seeing duplicate runs but the UI shows only one schedule, I'd recommend:
1. First check the deployment schedule table directly in the database
2. Look at the flow_run table for the scheduled runs and their creation timestamps
3. Check if there are any automation rules that might be creating additional runs
The database schema shows that schedules are linked to deployments through the deployment_id field in the deployment_schedule table, so any orphaned schedules (if they exist) would be visible there.
Would you like me to provide more specific SQL queries for investigating this issue, or would you prefer guidance on using the Prefect API to inspect the schedules?