Kartik
10/23/2025, 6:15 PMMarvin
10/23/2025, 6:15 PMMarvin
10/23/2025, 6:22 PMMarvin
10/23/2025, 6:22 PMprefect.yaml
name: pg-project
prefect-version: ">=3.0.0"
# Define one or more deployments
deployments:
- name: pg-transform
description: "Extract from transactions, transform, and upsert into transformed_transactions in a single DB transaction."
entrypoint: pg.py:pg_flow
tags: ["postgres", "etl"]
parameters: {} # add runtime flow params here if needed
# The work pool your worker will poll. Replace with your actual pool name.
work_pool:
name: your-process-pool
# Optional job variables per infrastructure type
# For a Process work pool you can pass env vars like this:
job_variables:
env:
PG_CONNINFO: ${PG_CONNINFO}
Optional: requirements.txt
prefect>=3.0.0
psycopg[binary]>=3.1
Optional: .env (example)
PG_CONNINFO=postgresql://postgres:postgres@localhost:5432/postgres
How to deploy and run
- From your project root (where prefect.yaml lives), create a work pool (if you don’t have one yet). For a local process worker:
- Create the pool (one-time):
prefect work-pool create "your-process-pool" --type process
- Start a worker to poll that pool:
prefect worker start --pool your-process-pool
- Register the deployment defined in prefect.yaml:
prefect deploy
- Run it from the UI or kick off from CLI:
prefect deployment run "pg-transform"
```
Notes
- This example does all DB work in one task to guarantee a single transaction. If you split extract/transform/load into multiple tasks, each task would use its own connection/transaction by default.
- Adjust table/column names, types, and the transformation logic to your schema.Kartik
10/24/2025, 6:34 AMMarvin
10/24/2025, 6:34 AMMarvin
10/24/2025, 6:35 AMMarvin
10/24/2025, 6:35 AMprefect.yaml
name: pg-project
prefect-version: ">=3.0.0"
deployments:
- name: pg-transform
description: "SQLAlchemy ETL: transactions -> transformed_transactions in a single transaction."
entrypoint: pg.py:pg_flow
tags: ["postgres", "etl", "sqlalchemy"]
parameters: {}
work_pool:
name: your-process-pool
job_variables:
env:
# Prefer PG_SQLA_URL (SQLAlchemy URL). Example:
# postgresql+psycopg://postgres:postgres@localhost:5432/postgres
PG_SQLA_URL: ${PG_SQLA_URL}
# Or use PG_CONNINFO and it will be upgraded to +psycopg automatically:
# PG_CONNINFO: ${PG_CONNINFO}
requirements.txt
prefect>=3.0.0
sqlalchemy>=2.0
psycopg[binary]>=3.1
Example .env (for local dev)
PG_SQLA_URL=postgresql+psycopg://postgres:postgres@localhost:5432/postgres
```
Notes
- All DB work is inside a single engine.begin() transaction; success commits, errors roll back.
- Upsert uses PostgreSQL ON CONFLICT by way of SQLAlchemy’s dialect insert. Adjust column names/types to your schema.