ASHIK
08/22/2025, 8:35 AMMarvin
08/22/2025, 8:35 AMMarvin
08/22/2025, 8:41 AMfrom prefect import task
@task(tags={"db-writes"})
def write_to_db(...):
...
2) Create a concurrency limit for that tag (example limits writers to 1 at a time):
prefect concurrency-limit create db-writes 1
Docs: Global concurrency limits | CLI: concurrency-limit CLI
- Cap overall run throughput for a work pool
prefect work-pool set-concurrency-limit "my-pool" 10
- Reduce per-flow parallelism
from prefect import flow
from prefect.task_runners import ThreadPoolTaskRunner
@flow(task_runner=ThreadPoolTaskRunner(max_workers=4))
def my_flow():
...
- Add retries for transient deadlock errors
from prefect import task
def retry_on_deadlock(exc: BaseException) -> bool:
# Replace with your driver’s deadlock/serialization exception class test
return "deadlock detected" in str(exc).lower()
@task(retries=5, retry_delay_seconds=[5,10,20], retry_condition_fn=retry_on_deadlock)
def write_to_db(...):
...
Database-side practices to prevent deadlocks
- Keep transactions short and consistent: update rows in a fixed order across all tasks, and avoid long-running transactions
- If you’re dequeuing work from a table, use row-level locking patterns:
- SELECT … FOR UPDATE SKIP LOCKED
- Or use advisory locks when appropriate
- Set timeouts instead of waiting indefinitely:
SET lock_timeout = '5s';
SET statement_timeout = '60s';
Handle timeout exceptions with retries.
- Add appropriate indexes to minimize lock durations and row scans
- If you batch writes, keep batches small
How to diagnose the current deadlock
- Enable logging and shorter deadlock detection:
ALTER SYSTEM SET log_lock_waits = on;
ALTER SYSTEM SET deadlock_timeout = '1s';
SELECT pg_reload_conf();
- Inspect blockers/waiters:
SELECT pid, usename, state, wait_event_type, wait_event, xact_start, query
FROM pg_stat_activity
WHERE datname = current_database()
ORDER BY xact_start;
SELECT a.pid AS waiting_pid,
a.query AS waiting_query,
bl.pid AS blocking_pid,
bl.query AS blocking_query
FROM pg_locks l
JOIN pg_stat_activity a ON a.pid = l.pid
JOIN LATERAL unnest(pg_blocking_pids(a.pid)) AS bl_pid(bl_pid) ON true
JOIN pg_stat_activity bl ON bl.pid = bl_pid.bl_pid;
If you are using Prefect to write via SQLAlchemy, consider the prefect-sqlalchemy integration to manage connections cleanly: prefect-sqlalchemy (and keep each task’s transaction scope tight).
If you can share:
- Which DB is deadlocking (your app DB vs. Prefect API DB)
- A small code/SQL example of the write pattern
- Any relevant Postgres logs
…I can propose a targeted fix and the least invasive concurrency settings to apply.ASHIK
08/22/2025, 8:51 AMASHIK
08/22/2025, 8:53 AMMarvin
08/22/2025, 8:53 AMASHIK
08/22/2025, 8:53 AMMarvin
08/22/2025, 8:53 AMMarvin
08/22/2025, 8:55 AMtask_run
upsert). The deadlock you’re seeing matches a known pattern we’ve seen when multiple processes insert/update the same task_run rows concurrently:
- Example report with the same SQL: prefecthq/prefect#17767
To help quickly, could you share:
- Your Prefect version (assume 3.x, but which exact version?)
- How you’re running the server (Helm, Docker compose, bare prefect server start
), and whether you have multiple API replicas
- Postgres version and any pool settings you’ve applied
Immediate mitigations you can try
- If you’re running more than one API/server replica, scale down to a single replica temporarily to remove intra-server contention on task_run
upserts.
- Reduce write pressure by limiting overall concurrency:
- Cap work pool throughput:
prefect work-pool set-concurrency-limit "YOUR_POOL" 10
- Use tag-based concurrency for chatty flows/tasks:
from prefect import task
@task(tags={"db-writes"})
def noisy_task(...):
...
Then:
prefect concurrency-limit create db-writes 1
Docs: Global concurrency limits | CLI: concurrency-limit CLI
- If a single flow spawns many tasks simultaneously, reduce per-flow parallelism:
from prefect import flow
from prefect.task_runners import ThreadPoolTaskRunner
@flow(task_runner=ThreadPoolTaskRunner(max_workers=4))
def my_flow():
...
Database-side steps
- Turn on deadlock diagnostics to confirm blockers:
ALTER SYSTEM SET log_lock_waits = on;
ALTER SYSTEM SET deadlock_timeout = '1s';
SELECT pg_reload_conf();
Then inspect current blockers:
SELECT a.pid AS waiting_pid, a.query AS waiting_query,
bl.pid AS blocking_pid, bl.query AS blocking_query
FROM pg_locks l
JOIN pg_stat_activity a ON a.pid = l.pid
JOIN LATERAL unnest(pg_blocking_pids(a.pid)) AS bl_pid(bl_pid) ON true
JOIN pg_stat_activity bl ON bl.pid = bl_pid.bl_pid;
- Ensure autovacuum is healthy on Prefect tables to keep transactions short.
Upgrade recommendation
- If you’re on 3.3.x or earlier, please try the latest 3.4.x — we’ve made several robustness improvements to services that interact with the DB. Latest notes: Prefect releases (most recent: 3.4.14).
Why this happens
- The task_run_recorder
service upserts task_run
rows with a WHERE clause on state_timestamp
to only move state forward. Under high concurrency (or multiple API replicas), two transactions can lock rows in opposite orders (or hit speculative insert + sharelock) and deadlock. Reducing concurrent upserts and/or running a single API replica typically eliminates it.
If you can confirm:
- Prefect version and deployment method (single vs multiple API replicas)
- Postgres version and whether this is RDS/Aurora, etc.
- Approximate number of concurrent task runs/events
…I can give you a targeted plan (replica count, specific concurrency values, and any DB pool tweaks).ASHIK
08/22/2025, 9:07 AMMarvin
08/22/2025, 9:07 AMMarvin
08/22/2025, 9:08 AMtask_run
(same INSERT ... ON CONFLICT shown here): prefecthq/prefect#17767.
Here’s a focused plan for Aurora + Prefect 3.2:
Priority fixes
1) Run a single API/server replica
- If you’re running multiple API pods/containers, scale down to 1. This removes most contention paths for task_run
upserts.
2) Upgrade Prefect server
- There have been robustness improvements after 3.2. Please try the latest 3.4.x (current: 3.4.14). If you can’t jump straight there, at least try ≥3.3.x to see if deadlocks decrease.
Right-size DB connection pool (server)
- Use these env vars to keep the server’s pool modest and recycle idle conns (good practice on Aurora):
PREFECT_SERVER_DATABASE_CONNECTION_URL=<postgresql+asyncpg://USER:PASS@your-aurora-writer-endpoint:5432/prefect?application_name=prefect&options=-c%20lock_timeout%3D5s%20-c%20statement_timeout%3D60s>
PREFECT_SERVER_DATABASE_SQLALCHEMY_POOL_SIZE=10
PREFECT_SERVER_DATABASE_SQLALCHEMY_MAX_OVERFLOW=20
PREFECT_SERVER_DATABASE_SQLALCHEMY_POOL_TIMEOUT=30
PREFECT_SERVER_DATABASE_SQLALCHEMY_POOL_RECYCLE=1800
- Settings reference: Prefect settings reference
- Notes:
- Use your Aurora writer endpoint (not a reader).
- The options
parameter applies per-session timeouts to avoid long waits.
- POOL_RECYCLE
helps avoid stale/idle connections in Aurora.
Reduce write pressure to the server (optional but helpful)
- Cap work-pool concurrency:
prefect work-pool set-concurrency-limit "YOUR_POOL" 10
- Add tag-based concurrency for chatty flows/tasks:
from prefect import task
@task(tags={"db-writes"})
def noisy_task(...):
...
Then:
prefect concurrency-limit create db-writes 1
Docs: Global concurrency limits | CLI: concurrency-limit CLI
- Reduce per-flow parallelism if a single flow spawns many tasks at once:
from prefect import flow
from prefect.task_runners import ThreadPoolTaskRunner
@flow(task_runner=ThreadPoolTaskRunner(max_workers=4))
def my_flow():
...
Aurora parameter group diagnostics
- Enable quick deadlock visibility:
log_lock_waits = 1
deadlock_timeout = 1s
- Then capture blockers/waiters when it occurs:
SELECT a.pid AS waiting_pid, a.query AS waiting_query,
bl.pid AS blocking_pid, bl.query AS blocking_query
FROM pg_locks l
JOIN pg_stat_activity a ON a.pid = l.pid
JOIN LATERAL unnest(pg_blocking_pids(a.pid)) AS bl_pid(bl_pid) ON true
JOIN pg_stat_activity bl ON bl.pid = bl_pid.bl_pid;
A few clarifying questions to tune this further:
- Are you running multiple API/server replicas currently?
- Roughly how many concurrent task runs are active during the spikes?
- Aurora flavor (Provisioned vs Serverless v2) and Postgres version?
If you can try the single-replica change and optionally the upgrade, let me know the results. If deadlocks persist, share the pg_stat_activity
blocking pairs and we’ll dig in further.ASHIK
08/22/2025, 9:11 AMMarvin
08/22/2025, 9:11 AMMarvin
08/22/2025, 9:12 AM...:3.4.7-python3.11-conda
- Let DB migrations finish and the API become healthy.
- Keep your workers on 3.2.0 initially — they’ll continue to run.
- Upgrade workers to 3.4.x when convenient to keep the stack aligned and to pick up fixes.
Notes
- Avoid using new 3.4-only features from your flow code until your workers are upgraded.
- If you’re chasing the Postgres deadlock you reported, the server upgrade plus running a single API replica typically reduces or removes those task_run
upsert deadlocks. If needed, we can tune DB pool settings next.
Refs
- Server concepts and deployment: Prefect Server concepts
- Releases: Prefect releases
If you share how you deploy (Helm/Docker Compose) I can give exact upgrade steps and the image tag to use.Marvin
08/22/2025, 9:15 AMMarvin
08/22/2025, 9:16 AM...:3.4.7-python3.11-conda
.
References
- Server concepts: Prefect Server concepts
- Releases: Prefect releases (compare 3.2.0 → 3.4.7: diff)
If you tell me how you deploy (Helm/Docker Compose), I can share exact upgrade steps and image tags for both server and workers.ASHIK
08/22/2025, 9:23 AMMarvin
08/22/2025, 9:23 AMMarvin
08/22/2025, 9:24 AMPREFECT_API_URL
at the server.
Recommended rollout
- Snapshot your Aurora DB (server migrations are one-way).
- Temporarily scale the API to a single replica.
- Upgrade the server image to 3.4.7-python3.11-conda
.
- Verify health (API up, DB migrations done, workers still polling).
- Later, upgrade workers to 3.4.x to pick up fixes and keep the stack aligned.
Refs
- Releases: Prefect releases
If you want, share how you deploy (Helm/Docker Compose/K8s manifests) and I’ll provide the exact image tag changes and a quick checklist.