alex joyce
09/17/2025, 7:01 AMMarvin
09/17/2025, 7:01 AMMarvin
09/17/2025, 7:16 AMselect
n.nspname as schema,
c.relname as table,
pg_size_pretty(pg_relation_size(c.oid)) as table_size,
pg_size_pretty(pg_indexes_size(c.oid)) as indexes_size,
pg_size_pretty(pg_total_relation_size(c.oid)) as total_size
from pg_class c
join pg_namespace n on n.oid = c.relnamespace
where c.relkind = 'r' and n.nspname = :schema
order by pg_total_relation_size(c.oid) desc;
- Estimated row counts
select
n.nspname as schema,
c.relname as table,
c.reltuples::bigint as estimated_rows
from pg_class c
join pg_namespace n on n.oid = c.relnamespace
where c.relkind = 'r' and n.nspname = :schema
order by estimated_rows desc;
- Table activity/usage stats
select
schemaname as schema,
relname as table,
seq_scan, idx_scan,
n_tup_ins, n_tup_upd, n_tup_del, n_tup_hot_upd,
vacuum_count, autovacuum_count, analyze_count, autoanalyze_count,
last_vacuum, last_autovacuum, last_analyze, last_autoanalyze
from pg_stat_user_tables
where schemaname = :schema
order by (seq_scan + idx_scan) desc;
- Indexes and definitions
select
i.schemaname as schema,
i.tablename as table,
i.indexname,
i.indexdef
from pg_indexes i
where i.schemaname = :schema
order by i.tablename, i.indexname;
- Index usage stats
select
schemaname as schema,
relname as table,
indexrelname as index,
idx_scan, idx_tup_read, idx_tup_fetch
from pg_stat_user_indexes
where schemaname = :schema
order by idx_scan desc;
- Columns and data types
select
table_schema, table_name, ordinal_position, column_name, data_type, is_nullable
from information_schema.columns
where table_schema = :schema
order by table_name, ordinal_position;
- Foreign keys (table-level dependencies)
select
tc.table_schema, tc.table_name, tc.constraint_name,
kcu.column_name,
ccu.table_schema as foreign_table_schema,
ccu.table_name as foreign_table_name,
ccu.column_name as foreign_column_name
from information_schema.table_constraints tc
join information_schema.key_column_usage kcu
on tc.constraint_name = kcu.constraint_name and tc.table_schema = kcu.table_schema
join information_schema.constraint_column_usage ccu
on ccu.constraint_name = tc.constraint_name and ccu.table_schema = tc.table_schema
where tc.constraint_type = 'FOREIGN KEY' and tc.table_schema = :schema
order by tc.table_name, tc.constraint_name, kcu.ordinal_position;
- Views that reference tables (view dependencies)
select
view_schema, view_name, table_schema, table_name
from information_schema.view_table_usage
where table_schema = :schema
order by view_schema, view_name, table_schema, table_name;
A single consolidated “size + usage” view per table
with sizes as (
select
n.nspname as schema,
c.relname as table,
pg_total_relation_size(c.oid) as total_bytes
from pg_class c
join pg_namespace n on n.oid = c.relnamespace
where c.relkind = 'r' and n.nspname = :schema
)
select
s.schema,
s.table,
pg_size_pretty(s.total_bytes) as total_size,
st.seq_scan, st.idx_scan,
st.n_tup_ins, st.n_tup_upd, st.n_tup_del,
st.last_autovacuum, st.last_autoanalyze
from sizes s
left join pg_stat_user_tables st
on st.schemaname = s.schema and st.relname = s.table
order by s.total_bytes desc;
Run these from a Prefect 3.x flow using prefect-sqlalchemy
- Install:Marvin
09/17/2025, 7:16 AMpip install prefect prefect-sqlalchemy sqlalchemy psycopg2-binary
- Docs: prefect-sqlalchemy docs
Create and use a SqlAlchemyConnector block (keeps credentials out of code)
from prefect_sqlalchemy import SqlAlchemyConnector
# Run once to create/save the block (or use the UI)
connector = SqlAlchemyConnector(
connection_info="<postgresql+psycopg2://user:pass@host:5432/dbname>",
)
connector.save("my-postgres", overwrite=True)
Reusable Prefect tasks/flow to run any query
from prefect import flow, task
from prefect_sqlalchemy import SqlAlchemyConnector
@task
def run_query(sql: str, params: dict | None = None) -> list[dict]:
connector = SqlAlchemyConnector.load("my-postgres")
with connector.get_connection(begin=False) as conn:
res = conn.exec_driver_sql(sql, params or {})
rows = res.fetchall()
cols = res.keys()
return [dict(zip(cols, r)) for r in rows]
@flow
def introspect(schema: str = "public"):
tables = run_query.submit("""
select table_schema, table_name
from information_schema.tables
where table_type='BASE TABLE' and table_schema = :schema
order by 1,2
""", {"schema": schema})
sizes = run_query.submit("""
select n.nspname as table_schema, c.relname as table_name,
pg_relation_size(c.oid) as table_bytes,
pg_indexes_size(c.oid) as index_bytes,
pg_total_relation_size(c.oid) as total_bytes
from pg_class c join pg_namespace n on n.oid = c.relnamespace
where c.relkind = 'r' and n.nspname = :schema
""", {"schema": schema})
stats = run_query.submit("""
select schemaname as table_schema, relname as table_name,
seq_scan, idx_scan, n_tup_ins, n_tup_upd, n_tup_del,
last_autovacuum, last_autoanalyze
from pg_stat_user_tables
where schemaname = :schema
""", {"schema": schema})
indexes = run_query.submit("""
select schemaname as schema, tablename as table, indexname, indexdef
from pg_indexes
where schemaname = :schema
order by tablename, indexname
""", {"schema": schema})
return {
"tables": tables.result(),
"sizes": sizes.result(),
"stats": stats.result(),
"indexes": indexes.result(),
}
if __name__ == "__main__":
introspect()
Notes and caveats
- Stats freshness: pg_stat_* views rely on autovacuum/analyze. Run ANALYZE
first if stats are stale.
- Permissions: you’ll need privileges to read catalog views and schemas.
- “Last access time” per table is not tracked directly. You can approximate usage with seq_scan/idx_scan, or use query logging/pg_stat_statements (requires enabling the extension) to get top queries, not table-level hits.
- For dependencies in functions/stored procedures, deeper catalog queries using pg_depend/pg_proc can help; tell me if you need that specifically.
Helpful references
- Postgres stats views: Monitoring Stats
- pg_stat_user_tables: Table statistics view
- Index stats: Index statistics
- Size functions: Database size functions
- Prefect SQLAlchemy integration: prefect-sqlalchemy docs
If you share which of the above outputs you need (and your schema/database), I can provide a focused query or a ready-made Prefect flow that exports the results to CSV/Parquet and schedules it.alex joyce
09/17/2025, 7:18 AMalex joyce
09/17/2025, 8:31 AMMarvin
09/17/2025, 8:31 AMMarvin
09/17/2025, 8:32 AMselect table_name
from information_schema.tables
where table_schema = 'public'
order by table_name;
- Quick row counts for all tables:
select
relname as table_name,
n_live_tup as estimated_rows
from pg_stat_user_tables
order by estimated_rows desc;
- Sizes (table + indexes):
select
relname as table_name,
pg_size_pretty(pg_total_relation_size(relid)) as total_size
from pg_catalog.pg_statio_user_tables
order by pg_total_relation_size(relid) desc;
Where this schema comes from
- ORM model definitions (authoritative list of tables/columns): prefect/server/database/orm_models.py
- Alembic migrations (history of schema changes): migrations/versions
Clarifications that help me tailor more detail
- Are you on Prefect Cloud or self-hosting Prefect Server with Postgres?
- Do you want specific columns/relationships for certain tables (I can share targeted column lists/ERD-style notes)?
- Do you want sample queries to join across flows/runs/states/logs for reporting?alex joyce
09/18/2025, 5:01 AMMarvin
09/18/2025, 5:01 AMresearch_prefect_topic
📊 Tools used:
• research_prefect_topic
(1x)alex joyce
09/18/2025, 6:39 AMMarvin
09/18/2025, 6:40 AMMarvin
09/18/2025, 6:46 AMprefect worker start --pool my-pool
) that polls a work pool and launches infrastructure (process, Docker, Kubernetes, etc.) to execute flow runs.
- Work Queues (optional): Named sub-routes under a pool that let you shard/route flow runs. In 3.x, queues primarily filter which runs are eligible for a worker; they do not change the ordering.
What actually happens internally
1) Polling and reservations
- Each worker loops, asking the server for runs with something equivalent to “get scheduled flow runs” for its work pool (optionally filtered to specific work queues).
- The API returns at most “capacity” runs and marks them reserved for that specific worker, so two workers don’t pick the same run at the same time. Reservations have a short lease; if a worker dies or can’t start the run, the lease expires and another worker can reserve it.
- Link: REST: get scheduled flow runs
- Concept links:
- Work pools (concepts)
- Workers (concepts)
2) Ordering (FIFO/LIFO?)
- There is no user-facing LIFO knob. Practically, the server returns eligible runs ordered by their scheduled start time (earliest first).
- If a run is “late” (scheduled in the past), it’s just an earlier scheduled time — so it will tend to be picked first.
- Work queues do not change ordering; they just filter which runs are considered eligible for a worker.
3) Concurrency and capacity
- Worker capacity: A worker only requests as many runs as it can execute concurrently, based on its configuration. It won’t over-reserve.
- Work pool concurrency: Pools can have a global concurrency limit. If the pool is at its limit, additional runs won’t be given out to workers until slots free up.
- CLI (verified): prefect work-pool set-concurrency-limit "my-pool" 10
- Tag-based concurrency limits: If you use tag-based concurrency, runs that would exceed a tag’s limit will remain Scheduled and won’t be handed out to workers until slots are available.
- Link: Tag-based concurrency limits
4) Fairness across multiple workers
- Multiple workers polling the same pool each get “up to their capacity” of the earliest-eligible runs. Reservations prevent double-picking.
- There’s no explicit global “fairness” policy beyond earliest scheduled time and the natural effect of multiple workers polling concurrently.
5) Late runs and retries
- Late runs are picked first because they have the earliest scheduled time.
- If a run can’t start due to concurrency limits (pool or tag), it remains Scheduled and will be retried by the server’s scheduler until it becomes eligible.
CLI you’ll use
- Start a worker:
- prefect worker start --pool my-pool
- Optionally limit to specific queues: prefect worker start --pool my-pool --work-queue high-priority --work-queue default
- Manage a pool’s concurrency:
- prefect work-pool set-concurrency-limit "my-pool" 10
- prefect work-pool clear-concurrency-limit "my-pool"
Key docs and references
- Workers: Workers (concepts)
- Work Pools: Work pools (concepts)
- API (server): Get scheduled flow runs
- Tag concurrency: Tag-based concurrency limitsMarvin
09/18/2025, 6:46 AMalex joyce
09/18/2025, 9:43 AMMarvin
09/18/2025, 9:43 AMMarvin
09/18/2025, 9:56 AM.submit(...)
.
Basic threaded concurrency
- Configure your flow with ThreadPoolTaskRunner
- Call tasks with .submit()
to schedule them concurrently
- Collect results from the returned PrefectFuture
objects
Example (I/O-bound work, e.g., HTTP calls)
from prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner
@task
def fetch_one(i):
# do some I/O (HTTP, DB, etc.)
return f"result-{i}"
@flow(task_runner=ThreadPoolTaskRunner(max_workers=10))
def my_flow():
futures = [fetch_one.submit(i) for i in range(20)]
results = [f.result() for f in futures]
print(results)
if __name__ == "__main__":
my_flow()
Notes
- Calling fetch_one(i)
runs synchronously; use fetch_one.submit(i)
for concurrency.
- max_workers
controls the thread pool size (tune for your workload).
- This approach is best for I/O-bound tasks (HTTP, DB, file I/O).
CPU-bound work
If your task is CPU-bound, prefer the process-based runner to sidestep the GIL:
from prefect import flow, task
from prefect.task_runners import ProcessPoolTaskRunner
@task
def heavy_compute(x):
return x * x # pretend this is CPU-heavy
@flow(task_runner=ProcessPoolTaskRunner(max_workers=4))
def my_cpu_flow():
futures = [heavy_compute.submit(i) for i in range(8)]
results = [f.result() for f in futures]
print(results)
Common tips
- Retries: Define on the task, e.g. @task(retries=3, retry_delay_seconds=5)
.
- Thread safety: Ensure your task code (and libraries) can safely run in parallel threads.
- Don’t hide work in your own threads: Prefer .submit()
so Prefect can track, retry, and log properly. Spawning your own threads inside a task bypasses orchestration.
Docs
- Task runners overview: Task runners (Prefect 3.x)
If you share your workload (I/O-bound vs CPU-bound, expected concurrency), I can suggest a max_workers
and runner choice tailored to your case.