Robyn H
07/29/2025, 9:55 AMsqlalchemy.dialects.postgresql.asyncpg.AsyncAdapt_asyncpg_dbapi.Error: <class 'asyncpg.exceptions.DeadlockDetectedError'>: deadlock detected DETAIL: Process 87824 waits for ShareLock on transaction 7553736; blocked by process 87813. Process 87813 waits for ShareLock on speculative token 3 of transaction 7553735; blocked by process 87824Do you know why these Deadlock errors occur and how I can prevent them?
Marvin
07/29/2025, 9:55 AMMarvin
07/29/2025, 9:56 AMpython
from asyncio import Semaphore
# Limit concurrent database operations
db_semaphore = Semaphore(50) # Adjust this number based on your system
async with db_semaphore:
# Your task creation/database operation here
2. PostgreSQL Configuration Optimizations
- Increase max_connections
in your PostgreSQL configuration
- Adjust deadlock_timeout
(default is 1s) to be slightly higher
- Consider setting statement_timeout
to prevent long-running queries
sql
ALTER SYSTEM SET max_connections = '200';
ALTER SYSTEM SET deadlock_timeout = '2s';
ALTER SYSTEM SET statement_timeout = '30s';
3. Database Transaction Management
- Keep transactions as short as possible
- Ensure consistent ordering of operations across transactions
- Consider using SELECT FOR UPDATE
with SKIP LOCKED
for queue-like operations
4. Application-Level Optimizations
- Implement exponential backoff retry logic for deadlock errors:
python
from prefect import task
from asyncio import sleep
from random import uniform
@task(retries=3)
async def my_task():
for attempt in range(3):
try:
# Your task logic here
return result
except sqlalchemy.exc.DBAPIError as e:
if "deadlock detected" in str(e):
# Exponential backoff with jitter
await sleep(uniform(0.1, 0.3) * (2 ** attempt))
continue
raise
5. Monitor and Tune Performance
- Watch for the CancellationCleanup
warnings you're seeing - they indicate your system is under stress
- Consider increasing the cleanup interval if these operations are consistently taking longer than their allocated time
- Monitor PostgreSQL performance metrics using tools like pg_stat_statements
6. Infrastructure Considerations
- Ensure your PostgreSQL instance has adequate resources (CPU, memory, IOPS)
- Consider using connection pooling (like PgBouncer) to manage database connections more efficiently
### Immediate Actions to Take
1. First, implement basic retry logic with backoff for your tasks to handle transient deadlocks
2. Review and potentially increase your PostgreSQL max_connections
setting
3. Consider implementing batching if you're creating all 100k tasks at once
4. Monitor PostgreSQL logs for detailed deadlock information to identify specific problematic patterns
### Long-term Considerations
If you continue to see these issues after implementing the above solutions, you might want to consider:
1. Partitioning your workflow into smaller, more manageable flows
2. Using a queue system (like Redis) to manage task distribution
3. Implementing more sophisticated concurrency controls at the application level
4. Upgrading your PostgreSQL infrastructure if you determine it's a resource constraint issueMarvin
07/29/2025, 9:56 AMRobyn H
07/29/2025, 11:29 AMMarvin
07/29/2025, 11:29 AMMarvin
07/29/2025, 11:30 AMmax_connections
would not prevent deadlocks. In fact, it could potentially make things worse by:
1. Allowing more concurrent transactions that could participate in deadlock scenarios
2. Consuming more system memory, potentially degrading overall database performance
3. Masking the underlying issues causing the deadlocks
Deadlocks occur when two or more transactions are waiting for each other to release locks in a circular pattern. The number of available connections doesn't affect this fundamental concurrency issue.
### What Actually Causes Deadlocks
In your case with 100k tasks, deadlocks are more likely caused by:
1. Multiple concurrent transactions trying to update the same rows in different orders
2. Long-running transactions increasing the window for lock conflicts
3. Inconsistent order of resource acquisition across transactions
### Better Solutions for Deadlock Prevention
Instead of increasing connections, consider these approaches:
1. Consistent Lock Ordering
- Ensure transactions always acquire locks in the same order
- This prevents the circular wait condition necessary for deadlocks
2. Shorter Transactions
- Keep transactions as brief as possible
- Reduce the window of opportunity for lock conflicts
3. Optimistic Locking
- Where appropriate, use version columns instead of explicit locks
- This can help avoid some lock contention scenarios
4. Transaction Batching
python
from prefect import flow
from asyncio import gather
@flow
async def process_many_tasks():
# Process in smaller batches instead of all at once
batch_size = 1000
for i in range(0, 100_000, batch_size):
batch = tasks[i:i+batch_size]
await gather(*[process_task(t) for t in batch])
5. Retry Logic with Backoff
python
from prefect import task
from asyncio import sleep
from random import uniform
@task(retries=3)
async def task_with_retry():
for attempt in range(3):
try:
# Your database operation here
return result
except sqlalchemy.exc.DBAPIError as e:
if "deadlock detected" in str(e):
# Add randomness to prevent retry storms
await sleep(uniform(0.1, 0.3) * (2 ** attempt))
continue
raise
Would you like me to elaborate on any of these alternative approaches to handling your deadlock issues?Robyn H
07/29/2025, 12:57 PMMarvin
07/29/2025, 12:57 PMMarvin
07/29/2025, 12:58 PMPREFECT_MESSAGING_BROKER=prefect_redis
2. Caching Layer
- Can serve as a caching mechanism for Prefect
- Configured using PREFECT_MESSAGING_CACHE=prefect_redis
- Helps improve performance for frequently accessed data
### Configuration Example
bash
# Redis configuration environment variables
PREFECT_MESSAGING_BROKER=prefect_redis
PREFECT_MESSAGING_CACHE=prefect_redis
PREFECT_REDIS_MESSAGING_DB=0
PREFECT_REDIS_MESSAGING_HOST=your_redis_host
PREFECT_REDIS_MESSAGING_PORT=6379
### Important Considerations
1. Optional Component
- Redis is not required for Prefect to function - the default setup uses in-memory messaging
- Adding Redis can help with scalability and reliability in larger deployments
2. Known Issues
- There are some reported issues with task and subflow visibility in the UI when using Redis as a message broker (as seen in GitHub issue #18540)
- These UI visibility issues don't affect actual task execution - tasks still run correctly
3. When to Consider Redis
- High-volume workflows with many concurrent tasks
- Distributed setups where in-memory messaging might be a bottleneck
- When you need more robust message persistence
4. Performance Impact
- Can help reduce memory usage on the Prefect server
- May improve reliability in high-concurrency scenarios
- Adds a network hop for message delivery (compared to in-memory)
### Recommendations
1. Start Without Redis
- Begin with the default in-memory setup
- Only add Redis if you encounter specific scaling issues
2. If Implementing Redis
- Monitor the Redis instance for performance
- Be aware of the potential UI visibility issues
- Consider implementing Redis Sentinel or Redis Cluster for high availability if needed
3. Monitoring
- Watch Redis memory usage
- Monitor Redis connection health
- Keep an eye on Prefect logs for any Redis-related issues
Would you like me to elaborate on any of these aspects or provide more specific guidance for your use case?Robyn H
07/29/2025, 1:49 PMMarvin
07/29/2025, 1:49 PMMarvin
07/29/2025, 1:50 PM[Flow/Task Execution]
↓
[Message Broker (Redis)] → [Prefect Server] → [Database (PostgreSQL/SQLite)]
↓ ↑ ↑
[Real-time Events] [API Requests] [Data Persistence]
↑
[Web UI]
### Important Distinctions
1. Persistent vs. Transient Data
- Database: Stores persistent state and history
- Redis (when used): Handles transient messaging and events
2. UI Updates
- Historical data: Comes from database queries
- Real-time updates: May come through the message broker
- Final state: Always stored in the database
3. Performance Considerations
- Database queries affect UI responsiveness
- Message broker affects real-time event delivery
- Both systems operate independently
### Common Misconceptions
1. Redis as Data Store
- Redis is NOT used as a primary data store
- It's purely for message brokering and caching
- All persistent data lives in the database
2. UI Data Source
- The UI doesn't read directly from Redis
- All UI data comes via the Prefect server's API
- The server reads from the database to fulfill these requests
This architecture ensures that your flow and task run history is always available in the database, regardless of the message broker's state or configuration. The message broker (Redis) is an optional component that can improve real-time communication but doesn't affect the fundamental storage and retrieval of your workflow data.
Would you like me to elaborate on any of these aspects or explain how this architecture might impact specific use cases?Robyn H
07/29/2025, 1:52 PMMarvin
07/29/2025, 1:52 PMMarvin
07/29/2025, 1:53 PMTask Completion → Event Generation → Message Broker (Redis) → Server Processing → Database Write → UI Update
Each step must complete for the UI to reflect the current state.
2. Database Write Constraints
- Each task state change requires a database write
- PostgreSQL has limits on concurrent transactions
- Lock contention can occur with many simultaneous writes
- The task_run_recorder
service must process each state change
### Common Symptoms
1. UI Lag
- Task states appear delayed in the UI
- Status updates appear in bursts
- Some tasks appear "stuck" in previous states
2. Database Signs
- High number of active connections
- Increased lock wait times
- Possible deadlocks (as you saw in your previous issue)
- task_run_recorder
warnings about processing time
### Example Warning Signs
python
WARNING | prefect.server.services.task_run_recorder - TaskRunRecorder took 53.716827 seconds to run, which is longer than its loop interval
### Mitigation Strategies
1. Batch Processing
python
from prefect import flow
from asyncio import gather
@flow
async def process_many_tasks():
# Process in controlled batches
batch_size = 100 # Adjust based on your system
for i in range(0, total_tasks, batch_size):
batch = tasks[i:i+batch_size]
await gather(*[task.submit() for task in batch])
# Allow time for database writes
await asyncio.sleep(0.1)
2. Database Optimization
- Increase max_parallel_workers
- Tune work_mem
for write operations
- Consider table partitioning for large task histories
3. Monitoring Points
sql
-- Check for long-running transactions
SELECT pid, now() - xact_start AS xact_runtime, query
FROM pg_stat_activity
WHERE xact_start IS NOT NULL
ORDER BY xact_runtime DESC;
4. Application-Level Controls
- Implement backpressure in task submission
- Use semaphores to limit concurrent database operations
- Consider implementing custom state batching
Remember: Redis as a message broker helps with event distribution but doesn't solve database write contention. The database remains the bottleneck for persisting and retrieving task states.
Would you like me to elaborate on any of these aspects or provide more specific optimization strategies for your use case?Robyn H
07/29/2025, 2:01 PMMarvin
07/29/2025, 2:01 PMMarvin
07/29/2025, 2:02 PMbash
# Install Redis server (Ubuntu/Debian)
sudo apt-get install redis-server
# Or for macOS using Homebrew
brew install redis
# Install the Prefect Redis integration
pip install prefect-redis
### 2. Start Redis Server
bash
# Start Redis server (Linux)
sudo systemctl start redis-server
# Or for macOS
brew services start redis
# Verify Redis is running
redis-cli ping
# Should return "PONG"
### 3. Configure Prefect to Use Redis
Create a .env
file in your project directory or set environment variables:
bash
# Redis Message Broker Configuration
PREFECT_MESSAGING_BROKER="prefect_redis"
PREFECT_MESSAGING_CACHE="prefect_redis"
PREFECT_REDIS_MESSAGING_HOST="localhost"
PREFECT_REDIS_MESSAGING_PORT="6379"
PREFECT_REDIS_MESSAGING_DB="0"
# Optional: If you've set a Redis password
# PREFECT_REDIS_MESSAGING_PASSWORD="your_password"
### 4. Verify Configuration
Create a simple test script to verify the setup:
python
from prefect import flow, task
import asyncio
@task
async def test_task():
await asyncio.sleep(1)
return "Hello from Redis-enabled Prefect!"
@flow
async def test_flow():
result = await test_task()
return result
if __name__ == "__main__":
asyncio.run(test_flow())
### 5. Security Considerations (Optional but Recommended)
Edit your Redis configuration file (/etc/redis/redis.conf
on Linux or /usr/local/etc/redis.conf
on macOS):
conf
# Basic security settings
requirepass your_secure_password
maxmemory 256mb
maxmemory-policy allkeys-lru
Then update your Prefect environment variables:
bash
PREFECT_REDIS_MESSAGING_PASSWORD="your_secure_password"
### 6. Monitoring Redis
You can monitor Redis using the CLI:
bash
# Connect to Redis CLI
redis-cli
# Monitor Redis commands in real-time
127.0.0.1:6379> MONITOR
# Check Redis info
127.0.0.1:6379> INFO
### 7. Troubleshooting
If you encounter issues, check:
1. Redis Connection
bash
# Test Redis connection
redis-cli ping
2. Redis Logs
bash
# Check Redis logs (Linux)
sudo journalctl -u redis-server
# Or check log file
tail -f /var/log/redis/redis-server.log
3. Common Issues and Solutions
- If Redis won't start:
bash
# Check Redis status
sudo systemctl status redis-server
- If Prefect can't connect:
python
# Test connection in Python
import redis
r = redis.Redis(host='localhost', port=6379, db=0)
print(r.ping()) # Should return True
### 8. Optional: Redis Commander (UI)
If you want a web UI to monitor Redis:
bash
# Install Redis Commander
npm install -g redis-commander
# Run Redis Commander
redis-commander
# Access via browser at <http://localhost:8081>
### Best Practices
1. Memory Management
conf
# In redis.conf
maxmemory 256mb
maxmemory-policy allkeys-lru
2. Persistence
conf
# In redis.conf
save 900 1
save 300 10
save 60 10000
3. Monitoring
python
# In your Prefect flows, add logging
from prefect import get_run_logger
@task
async def monitored_task():
logger = get_run_logger()
<http://logger.info|logger.info>("Task started")
# ... task work ...
<http://logger.info|logger.info>("Task completed")
Would you like me to elaborate on any of these steps or provide more specific guidance for your environment?Robyn H
07/29/2025, 2:12 PMprefect config view
), is this to be expected?Marvin
07/29/2025, 2:12 PMMarvin
07/29/2025, 2:12 PMbash
pip install prefect-redis
2. Then set these environment variables:
bash
PREFECT_MESSAGING_BROKER="prefect_redis"
PREFECT_MESSAGING_BLOCK="redis/default" # Optional: if you want to use a specific Redis block
The reason you don't see the Redis host, port, and DB settings in prefect config view
is because these are managed through Prefect's block system rather than direct configuration variables. Here's how to set it up properly:
1. Create a Redis block in Prefect:
python
from prefect.blocks.system import Redis
redis_block = Redis.from_host_port(
host="localhost",
port=6379,
database=0
)
redis_block.save("default") # Save with name "default"
Or you can create it via the UI:
1. Go to Blocks in the UI
2. Click the "+" button
3. Select "Redis"
4. Configure with your Redis connection details
5. Save it with the name "default"
This is a more secure and flexible approach as it:
- Keeps sensitive connection information in Prefect's block storage
- Allows you to manage multiple Redis configurations
- Integrates with Prefect's security model
I apologize for my earlier incorrect advice about the environment variables. Would you like me to provide more details about setting up the Redis block or using Redis with Prefect?Robyn H
07/29/2025, 2:33 PMMarvin
07/29/2025, 2:33 PMMarvin
07/29/2025, 2:34 PMMarvin
07/29/2025, 2:34 PMbash
pip install prefect-redis
b. Configure the message broker using environment variables:
bash
PREFECT_MESSAGING_BROKER="prefect_redis"
2. The "Redis Database" block you created in the UI is actually for different purposes:
- It's typically used for storing data or caching
- It's not the same as configuring Redis as a message broker
- You can use it for other Redis-related operations in your flows, but it's not required for message broker functionality
To properly set up Redis as your message broker:
1. Remove the Redis Database block you created (it's not needed for this purpose)
2. Make sure Redis is running on your machine
3. Set the environment variable:
bash
export PREFECT_MESSAGING_BROKER="prefect_redis"
4. Restart your Prefect server
To verify the configuration:
bash
prefect config view
You should see PREFECT_MESSAGING_BROKER="prefect_redis"
in the output.
I apologize for my earlier confusion about blocks. Would you like me to provide more details about:
1. How to verify Redis is working as your message broker?
2. What the Redis Database block is actually used for?
3. How to troubleshoot Redis connectivity issues?Robyn H
07/29/2025, 2:39 PMFile "/home/rhornwall/.local/lib/python3.11/site-packages/prefect/server/orchestration/rules.py", line 1035, in aexit
await self.after_transition(self.context)
File "/home/rhornwall/.local/lib/python3.11/site-packages/prefect/server/orchestration/instrumentation_policies.py", line 56, in after_transition
async with PrefectServerEventsClient() as events:
File "/home/rhornwall/.local/lib/python3.11/site-packages/prefect/server/events/clients.py", line 218, in aenter
publisher = messaging.create_event_publisher()
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/rhornwall/.local/lib/python3.11/site-packages/prefect/server/events/messaging.py", line 79, in create_event_publisher
publisher = create_publisher(topic="events", deduplicate_by="id")
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/rhornwall/.local/lib/python3.11/site-packages/prefect/server/utilities/messaging/__init__.py", line 197, in create_publisher
assert isinstance(module, BrokerModule)
AssertionErrorWhy is this?
Marvin
07/29/2025, 2:39 PMMarvin
07/29/2025, 2:42 PMbash
# Check Redis is running
redis-cli ping
# Should return "PONG"
2. Set both required environment variables:
bash
export PREFECT_MESSAGING_BROKER="prefect_redis"
export PREFECT_MESSAGING_TYPE="redis"
3. Verify the Redis package is installed correctly:
bash
pip install -U prefect-redis
4. Restart your Prefect server:
bash
prefect server start
The error you're seeing (AssertionError
in messaging.py) typically occurs when:
1. The broker module isn't properly registered
2. The messaging type isn't correctly specified
3. The Redis connection isn't properly configured
To debug this:
1. Check your current configuration:
bash
prefect config view
2. Verify the Redis connection:
python
import redis
r = redis.Redis(host='localhost', port=6379)
try:
r.ping()
print("Redis connection successful")
except redis.ConnectionError:
print("Redis connection failed")
3. If you're still seeing issues, you can enable debug logging:
bash
export PREFECT_LOGGING_LEVEL="DEBUG"
Would you like me to:
1. Provide more detailed debugging steps?
2. Explain how to verify the Redis broker is working correctly?
3. Show how to check the Redis logs for any connection issues?Robyn H
07/29/2025, 3:09 PMMarvin
07/29/2025, 3:09 PMMarvin
07/29/2025, 3:09 PMbash
export PREFECT_LOGGING_LEVEL="DEBUG"
2. Make sure you have the latest version of prefect-redis:
bash
pip install -U prefect-redis
3. Try adding these additional environment variables:
bash
export PREFECT_EVENTS_MAXIMUM_INTERVAL_SECONDS="1"
export PREFECT_EVENTS_MINIMUM_INTERVAL_SECONDS="1"
4. If you're running the server locally, try restarting it with these steps:
bash
# Stop any running Prefect processes
pkill -f "prefect*"
# Clear Redis (optional, but can help if there are stuck messages)
redis-cli FLUSHALL
# Start the server again
prefect server start
5. Run a test flow with explicit logging to verify task execution:
python
from prefect import flow, task
import asyncio
@task
async def test_task(name: str):
logger = get_run_logger()
<http://logger.info|logger.info>(f"Running task {name}")
await asyncio.sleep(1)
return f"Completed {name}"
@flow
async def test_flow():
results = await asyncio.gather(
test_task("A"),
test_task("B"),
test_task("C")
)
return results
if __name__ == "__main__":
asyncio.run(test_flow())
If you're still not seeing task runs in the UI after these steps, we can try:
1. Checking Redis to see if events are being published:
bash
redis-cli
> SUBSCRIBE events
2. Verifying the task run recorder service is running:
bash
ps aux | grep task_run_recorder
3. As a temporary workaround, you could switch back to the memory messaging broker:
bash
unset PREFECT_MESSAGING_BROKER
unset PREFECT_MESSAGING_CACHE
Would you like me to:
1. Provide more detailed debugging steps?
2. Show how to verify if the events are actually reaching Redis?
3. Explain how to monitor the task run recorder service?Robyn H
07/29/2025, 3:27 PMMarvin
07/29/2025, 3:27 PMMarvin
07/29/2025, 3:27 PMMarvin
07/29/2025, 3:28 PMpython
from prefect.utilities.importtools import load_module
try:
redis_module = load_module("prefect_redis.messaging")
print("Redis module loaded successfully")
except Exception as e:
print(f"Error loading Redis module: {e}")
2. Let's check your current environment variables are exactly correct:
bash
# These should match exactly
export PREFECT_MESSAGING_BROKER="prefect_redis.messaging" # Note the .messaging suffix
export PREFECT_MESSAGING_CACHE="prefect_redis.messaging"
export PREFECT_REDIS_MESSAGING_HOST="localhost"
export PREFECT_REDIS_MESSAGING_PORT="6379"
export PREFECT_REDIS_MESSAGING_DB="0"
export PREFECT_LOGGING_LEVEL="DEBUG"
3. Create a test script to verify Redis connectivity:
python
import redis
import asyncio
from prefect import flow, task, get_run_logger
# Test Redis connection
redis_client = redis.Redis(
host="localhost",
port=6379,
db=0
)
@task
async def test_task():
logger = get_run_logger()
<http://logger.info|logger.info>("Starting test task")
# Test Redis write
redis_client.set("prefect_test", "test_value")
value = redis_client.get("prefect_test")
<http://logger.info|logger.info>(f"Redis test value: {value}")
return "Done"
@flow
async def test_flow():
return await test_task()
if __name__ == "__main__":
# Test Redis connection first
try:
redis_client.ping()
print("Redis connection successful")
except Exception as e:
print(f"Redis connection failed: {e}")
exit(1)
# Run the flow
asyncio.run(test_flow())
4. Let's also check the Redis pub/sub functionality:
python
import redis
import threading
import time
def listen_for_events():
r = redis.Redis(host='localhost', port=6379, db=0)
pubsub = r.pubsub()
pubsub.subscribe('events') # This should match Prefect's channel
print("Listening for events...")
for message in pubsub.listen():
print(f"Received: {message}")
# Start listener in background
thread = threading.Thread(target=listen_for_events)
thread.daemon = True
thread.start()
# Keep script running
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
print("Stopping...")
5. Finally, let's verify the Prefect server services:
bash
# Stop any existing Prefect processes
pkill -f "prefect"
# Clear Redis
redis-cli FLUSHALL
# Start Prefect server with explicit service logging
export PREFECT_LOGGING_LEVEL="DEBUG"
export PREFECT_SERVER_SERVICES_LOGGING_LEVEL="DEBUG"
# Start the server
prefect server start
Could you:
1. Run these verification steps and share any errors you see?
2. Check if Redis is actually receiving any messages using the pub/sub test script?
3. Confirm the exact version of prefect-redis
you have installed (pip show prefect-redis
)?
This will help us pinpoint whether the issue is with:
- Redis connectivity
- Message publishing
- Service initialization
- Event routingRobyn H
07/30/2025, 11:57 AMMarvin
07/30/2025, 11:57 AMMarvin
07/30/2025, 11:58 AMbash
# First, stop any running services
prefect server services stop
# Then start them explicitly
prefect server services start
Also, could you run this modified test script that will help us trace the event flow:
python
import redis
import asyncio
from prefect import flow, task, get_run_logger
from prefect.logging import disable_run_logger
from prefect.settings import PREFECT_LOGGING_LEVEL
import logging
# Enable detailed logging
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
# Test Redis connection
redis_client = redis.Redis(
host="localhost",
port=6379,
db=0
)
@task
async def test_task():
logger = get_run_logger()
<http://logger.info|logger.info>("Starting test task")
await asyncio.sleep(1)
<http://logger.info|logger.info>("Task completed")
return "Done"
@flow(log_prints=True)
async def test_flow():
print("Flow starting")
result = await test_task()
print(f"Flow completed with result: {result}")
return result
if __name__ == "__main__":
# Test Redis connection first
try:
redis_client.ping()
print("Redis connection successful")
# Test pub/sub
pubsub = redis_client.pubsub()
pubsub.subscribe('events')
print("Subscribed to 'events' channel")
except Exception as e:
print(f"Redis connection failed: {e}")
exit(1)
# Run the flow
asyncio.run(test_flow())
# Check for any messages (wait briefly)
print("\nChecking for Redis messages:")
for _ in range(5):
message = pubsub.get_message()
if message:
print(f"Received message: {message}")
asyncio.run(asyncio.sleep(0.1))
This will help us see:
1. If Redis connection is working
2. If events are being published
3. If the task run recorder service is receiving events
Could you run this and share the output? Also, what do you see in the Prefect UI after running this test flow?Robyn H
07/30/2025, 12:13 PMpython3 test_redis_2.py
Redis connection successful
Subscribed to 'events' channel
141205.922 | INFO | prefect.engine - View at http://127.0.0.1:4200/runs/flow-run/f1e8c539-b36c-4552-a79d-8311eba236f5
141205.947 | INFO | Flow run 'tomato-jacamar' - Beginning flow run 'tomato-jacamar' for flow 'test-flow'
141205.949 | INFO | Flow run 'tomato-jacamar' - View at http://127.0.0.1:4200/runs/flow-run/f1e8c539-b36c-4552-a79d-8311eba236f5
141205.949 | INFO | Flow run 'tomato-jacamar' - Flow starting
141205.971 | INFO | Task run 'test_task-001' - Starting test task
141206.973 | INFO | Task run 'test_task-001' - Task completed
141206.975 | INFO | Task run 'test_task-001' - Finished in state Completed()
141206.976 | INFO | Flow run 'tomato-jacamar' - Flow completed with result: Done
141206.993 | INFO | Flow run 'tomato-jacamar' - Finished in state Completed()
Checking for Redis messages:
Received message: {'type': 'subscribe', 'pattern': None, 'channel': b'events', 'data': 1}In the Prefect UI, I see the flow run but with zero task runs, so they were clearly not picked up by Redis
Marvin
07/30/2025, 12:13 PMMarvin
07/30/2025, 12:13 PMpython
from prefect.server.utilities.messaging import create_publisher, create_consumer
import asyncio
async def test_messaging():
try:
publisher = create_publisher(topic="events", deduplicate_by="id")
print(f"Publisher type: {type(publisher)}")
consumer = create_consumer(topic="events")
print(f"Consumer type: {type(consumer)}")
except Exception as e:
print(f"Error creating messaging clients: {e}")
print(f"Error type: {type(e)}")
if __name__ == "__main__":
asyncio.run(test_messaging())
Could you run this and share what it prints? This will help us verify if Prefect is actually using the Redis messaging implementation.
Also, let's check the Redis logs to see if there are any connection attempts:
bash
# If Redis was installed via apt
sudo tail -f /var/log/redis/redis-server.log
# Or if using Redis from brew on macOS
tail -f /usr/local/var/log/redis.log
Additionally, let's try forcing the task run recorder to be more verbose:
bash
# Stop existing services
prefect server services stop
# Set additional debug variables
export PREFECT_SERVER_SERVICES_TASK_RUN_RECORDER_LOGGING_LEVEL="DEBUG"
export PREFECT_SERVER_SERVICES_LOGGING_LEVEL="DEBUG"
# Start services again
prefect server services start
Then run this modified version of the test script that will help us trace the event flow more explicitly:
python
import redis
import asyncio
from prefect import flow, task, get_run_logger
from prefect.logging import disable_run_logger
import logging
from prefect.server.utilities.messaging import create_publisher
# Enable detailed logging
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
@task
async def test_task():
logger = get_run_logger()
<http://logger.info|logger.info>("Starting test task")
# Try to publish a test event directly
try:
publisher = create_publisher(topic="events", deduplicate_by="id")
await publisher.publish({"test": "message"})
<http://logger.info|logger.info>("Published test event")
except Exception as e:
logger.error(f"Failed to publish test event: {e}")
await asyncio.sleep(1)
<http://logger.info|logger.info>("Task completed")
return "Done"
@flow(log_prints=True)
async def test_flow():
print("Flow starting")
result = await test_task()
print(f"Flow completed with result: {result}")
return result
if __name__ == "__main__":
asyncio.run(test_flow())
This modified version will:
1. Try to publish events directly using the messaging system
2. Show us more detailed logging about what's happening with the event publishing
3. Help us determine if the issue is with event publishing or event consumption
Could you:
1. Run the first script to check the messaging configuration
2. Check the Redis logs for any connection attempts
3. Run the modified test flow with the enhanced debugging enabled
4. Share what you see in both the console output and the Redis logs?
This will help us pinpoint exactly where in the event pipeline things are breaking down.Robyn H
07/30/2025, 12:41 PMMarvin
07/30/2025, 12:41 PMMarvin
07/30/2025, 12:42 PMpython
import redis
import asyncio
from prefect import flow, task, get_run_logger
from prefect.logging import disable_run_logger
import logging
from prefect.server.utilities.messaging import create_publisher
from prefect.server.utilities.events import Event
from datetime import datetime
from zoneinfo import ZoneInfo
# Enable detailed logging
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
@task
async def test_task():
logger = get_run_logger()
<http://logger.info|logger.info>("Starting test task")
# Try to publish a test event directly
try:
publisher = create_publisher(topic="events", deduplicate_by="id")
# Create a test event
test_event = Event(
event="test.event",
resource={
"prefect.resource.id": "test-resource",
"prefect.resource.name": "test-name",
},
occurred=datetime.now(ZoneInfo("UTC")),
)
await publisher.publish_batch([test_event])
<http://logger.info|logger.info>("Published test event")
except Exception as e:
logger.error(f"Failed to publish test event: {e}")
logger.error(f"Error type: {type(e)}")
import traceback
logger.error(f"Traceback: {traceback.format_exc()}")
await asyncio.sleep(1)
<http://logger.info|logger.info>("Task completed")
return "Done"
@flow(log_prints=True)
async def test_flow():
print("Flow starting")
result = await test_task()
print(f"Flow completed with result: {result}")
return result
if __name__ == "__main__":
asyncio.run(test_flow())
Also, let's check the Redis pub/sub directly while the flow is running. In a separate terminal window, run this script:
python
import redis
import json
from datetime import datetime
import time
def monitor_redis():
r = redis.Redis(host='localhost', port=6379, db=0)
pubsub = r.pubsub()
# Subscribe to all channels
pubsub.psubscribe('*')
print("Monitoring Redis messages...")
start_time = datetime.now()
try:
while (datetime.now() - start_time).seconds < 30: # Monitor for 30 seconds
message = pubsub.get_message()
if message and message['type'] not in ('subscribe', 'psubscribe'):
print(f"\nReceived message at {datetime.now()}:")
print(f"Channel: {message['channel']}")
print(f"Data: {message['data']}")
time.sleep(0.1)
except KeyboardInterrupt:
print("\nStopping monitor...")
finally:
pubsub.unsubscribe()
if __name__ == "__main__":
monitor_redis()
Could you:
1. Run the Redis monitor script in one terminal
2. Run the corrected test flow in another terminal
3. Share both outputs?
This will help us see:
- If events are being properly formatted
- If they're actually reaching Redis
- Where in the pipeline they might be getting lost
Also, could you check if the task run recorder service is running with:
bash
ps aux | grep "prefect server"
This will show us all Prefect server processes, including the task run recorder if it's running.Robyn H
07/30/2025, 2:06 PMMarvin
07/30/2025, 2:06 PMMarvin
07/30/2025, 2:07 PMbash
# Stop all services first
prefect server services stop
# Set more verbose logging for the task run recorder
export PREFECT_SERVER_SERVICES_TASK_RUN_RECORDER_LOGGING_LEVEL="DEBUG"
export PREFECT_API_SERVICES_TASK_RUN_RECORDER_LOGGING_LEVEL="DEBUG"
export PREFECT_LOGGING_LEVEL="DEBUG"
# Start services explicitly
prefect server services start
Then, let's create a test script that will help us trace the entire event path:
python
import asyncio
from prefect import flow, task, get_run_logger
from prefect.server.utilities.messaging import create_publisher, create_consumer
from prefect.server.utilities.events import Event
from datetime import datetime
from zoneinfo import ZoneInfo
import logging
# Configure logging
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
async def monitor_events():
"""Monitor the events topic directly"""
consumer = create_consumer(topic="events")
print("Starting event monitor...")
try:
async for event in consumer.consume():
print(f"Received event: {event}")
except Exception as e:
print(f"Error in monitor: {e}")
@task
async def test_task():
logger = get_run_logger()
<http://logger.info|logger.info>("Starting test task")
await asyncio.sleep(1)
<http://logger.info|logger.info>("Task completed")
return "Done"
@flow(log_prints=True)
async def test_flow():
# Start event monitor in the background
monitor_task = asyncio.create_task(monitor_events())
print("Flow starting")
result = await test_task()
print(f"Flow completed with result: {result}")
# Give some time for events to be processed
await asyncio.sleep(5)
# Cancel the monitor
monitor_task.cancel()
try:
await monitor_task
except asyncio.CancelledError:
pass
return result
if __name__ == "__main__":
asyncio.run(test_flow())
Could you:
1. Stop all Prefect services:
bash
prefect server stop
prefect server services stop
2. Clear Redis:
bash
redis-cli FLUSHALL
3. Set these environment variables:
bash
export PREFECT_SERVER_SERVICES_TASK_RUN_RECORDER_LOGGING_LEVEL="DEBUG"
export PREFECT_API_SERVICES_TASK_RUN_RECORDER_LOGGING_LEVEL="DEBUG"
export PREFECT_LOGGING_LEVEL="DEBUG"
4. Start the server and services:
bash
prefect server start
5. In a separate terminal, run:
bash
prefect server services ls
to verify all services are running
6. Run the test script above
This will help us see:
1. If events are being published
2. If they can be consumed
3. If the task run recorder service is actually listening for events
The task run recorder should be receiving events from the message broker and then recording them in the database. If we're not seeing any "received event" entries in the logs, it could mean:
1. The events aren't being published correctly
2. The task run recorder isn't subscribed to the correct topic
3. There's a configuration mismatch between the publisher and consumer
Would you like to try these steps and share what you see in the logs?Robyn H
07/30/2025, 2:21 PMconsumer.consume()
which isn't a known attributeMarvin
07/30/2025, 2:21 PMMarvin
07/30/2025, 2:22 PMpython
import asyncio
from prefect import flow, task, get_run_logger
from prefect.server.utilities.messaging import create_publisher, create_consumer
import logging
import json
# Configure logging
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
async def monitor_events():
"""Monitor the events topic directly"""
consumer = create_consumer(topic="events")
print("Starting event monitor...")
try:
while True:
message = await consumer.get()
if message:
print(f"Received message: {message}")
except Exception as e:
print(f"Error in monitor: {e}")
@task
async def test_task():
logger = get_run_logger()
<http://logger.info|logger.info>("Starting test task")
await asyncio.sleep(1)
<http://logger.info|logger.info>("Task completed")
return "Done"
@flow(log_prints=True)
async def test_flow():
# Start event monitor in the background
monitor_task = asyncio.create_task(monitor_events())
print("Flow starting")
result = await test_task()
print(f"Flow completed with result: {result}")
# Give some time for events to be processed
await asyncio.sleep(5)
# Cancel the monitor
monitor_task.cancel()
try:
await monitor_task
except asyncio.CancelledError:
pass
return result
if __name__ == "__main__":
asyncio.run(test_flow())
Also, let's try to directly inspect what's happening in Redis while the flow is running. Create a separate script called `redis_monitor.py`:
python
import redis
import time
from datetime import datetime
def monitor_redis():
r = redis.Redis(host='localhost', port=6379, db=0)
pubsub = r.pubsub()
# Subscribe to all channels
pubsub.psubscribe('*')
print(f"{datetime.now()} - Monitoring all Redis channels...")
try:
while True:
message = pubsub.get_message()
if message and message['type'] not in ('subscribe', 'psubscribe'):
print(f"\n{datetime.now()} - New message:")
print(f"Channel: {message['channel']}")
print(f"Type: {message['type']}")
print(f"Data: {message['data']}")
time.sleep(0.1)
except KeyboardInterrupt:
print("\nStopping monitor...")
finally:
pubsub.unsubscribe()
if __name__ == "__main__":
monitor_redis()
Could you:
1. First run the Redis monitor in one terminal:
bash
python redis_monitor.py
2. In another terminal, run the test flow:
bash
python test_flow.py
3. In a third terminal, check the Prefect server logs:
bash
tail -f ~/.prefect/server.log
This will help us see:
1. What messages are actually being published to Redis
2. Whether the task run recorder service is receiving these messages
3. If there are any errors in the server logs
Also, could you check the Redis keys to see what's actually being stored:
bash
redis-cli
> KEYS *
This will show us all keys in Redis and might give us a clue about how the events are being stored/managed.Robyn H
07/30/2025, 2:33 PMMarvin
07/30/2025, 2:33 PMMarvin
07/30/2025, 2:33 PMbash
# In redis-cli
KEYS *
TYPE events
TYPE actions
LLEN events # If it's a list
SMEMBERS events # If it's a set
HGETALL events # If it's a hash
Could you run these commands and share what they return?
Also, let's create a more direct test that tries to publish and consume messages using the Redis broker:
python
import asyncio
from prefect.server.utilities.messaging import create_publisher, create_consumer
import logging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
async def test_messaging():
# Create publisher and consumer
publisher = create_publisher(topic="events", deduplicate_by="id")
consumer = create_consumer(topic="events")
logger.debug(f"Publisher type: {type(publisher)}")
logger.debug(f"Consumer type: {type(consumer)}")
# Try to publish a simple message
test_message = {"test": "message", "id": "123"}
logger.debug(f"Attempting to publish: {test_message}")
# Inspect publisher methods
logger.debug(f"Available publisher methods: {dir(publisher)}")
try:
# Try different publish methods that might be available
if hasattr(publisher, 'publish'):
await publisher.publish(test_message)
elif hasattr(publisher, 'publish_batch'):
await publisher.publish_batch([test_message])
elif hasattr(publisher, 'send'):
await publisher.send(test_message)
<http://logger.info|logger.info>("Message published")
# Try to consume the message
logger.debug("Attempting to consume message...")
logger.debug(f"Available consumer methods: {dir(consumer)}")
# Wait briefly for message to be available
await asyncio.sleep(1)
# Try different consume methods
if hasattr(consumer, 'get'):
message = await consumer.get()
<http://logger.info|logger.info>(f"Received message via get(): {message}")
elif hasattr(consumer, 'receive'):
message = await consumer.receive()
<http://logger.info|logger.info>(f"Received message via receive(): {message}")
except Exception as e:
logger.error(f"Error during messaging test: {e}", exc_info=True)
if __name__ == "__main__":
asyncio.run(test_messaging())
This script will:
1. Show us exactly what methods are available on the publisher and consumer
2. Try different ways of publishing/consuming messages
3. Give us more detailed logging about what's happening
Could you:
1. Run the Redis inspection commands and share the results
2. Run this test script and share the output
3. While the test script is running, also run this Redis monitoring command in another terminal:
bash
redis-cli monitor
This will help us see:
1. How Redis is storing the events
2. What methods are actually available for messaging
3. If any Redis operations are happening during the test
Also, let's check the exact version of prefect-redis you have installed:
bash
pip show prefect-redis
This information will help us understand exactly what functionality should be available and how to properly use it.