<@ULVA73B9P> create a sample flow for me which wil...
# marvin-ai
k
@Marvin create a sample flow for me which will be dpeloyed via prefect.yaml, the flow pg.py will simply connect to the postgres database and will extract the data from the transactions table and will make some trasnformations and will finally insert it into the transformed_transactions table and this will be a single transaction , give me the sample code for the flow
m
thought for 373.1 seconds
Here’s a minimal Prefect 3.x example you can drop into a project and deploy via prefect.yaml. The flow opens a single database transaction, reads from transactions, applies a simple transformation in memory, and upserts into transformed_transactions. If anything fails, the transaction is rolled back so nothing is partially written. pg.py ``` import os from typing import Any, Dict, List import psycopg from psycopg.rows import dict_row from prefect import flow, task, get_run_logger def _conninfo_from_env() -> str: """ Reads connection info from either PG_CONNINFO or individual PG_* env vars. Example PG_CONNINFO: 'postgresql://user:pass@host:5432/dbname' """ conninfo = os.getenv("PG_CONNINFO") if conninfo: return conninfo host = os.getenv("PGHOST", "localhost") port = os.getenv("PGPORT", "5432") user = os.getenv("PGUSER", "postgres") password = os.getenv("PGPASSWORD", "") dbname = os.getenv("PGDATABASE", "postgres") return f"postgresql//{user}{password}@{host}:{port}/{dbname}" @task def transform_in_single_transaction() -> int: """ Runs the entire extract + transform + load in ONE DB transaction. - Extract: reads rows from public.transactions - Transform: simple currency conversion + status normalization - Load: upserts into public.transformed_transactions Returns number of rows written. """ logger = get_run_logger() conninfo = _conninfo_from_env() # autocommit=False ensures we control commit/rollback with psycopg.connect(conninfo, autocommit=False) as conn: try: with conn.cursor(row_factory=dict_row) as cur: # Ensure target table exists (created inside the same transaction) cur.execute( """ CREATE TABLE IF NOT EXISTS public.transformed_transactions ( transaction_id BIGINT PRIMARY KEY, amount_dollars NUMERIC(18,2) NOT NULL, created_at TIMESTAMPTZ NOT NULL, customer_id BIGINT, status TEXT NOT NULL, processed_at TIMESTAMPTZ NOT NULL DEFAULT NOW() ); """ ) # Extract cur.execute( """ SELECT id, amount_cents, created_at, customer_id, status FROM public.transactions """ ) rows: List[Dict[str, Any]] = cur.fetchall() logger.info(f"Fetched {len(rows)} rows from public.transactions") # Transform (example: cents -> dollars, normalize status) transformed = [] for r in rows: amount_dollars = (r["amount_cents"] or 0) / 100.0 status = (r["status"] or "").strip().upper() transformed.append( { "transaction_id": r["id"], "amount_dollars": amount_dollars, "created_at": r["created_at"], "customer_id": r["customer_id"], "status": status, } ) if not transformed: logger.info("No rows to write.") conn.commit() return 0 # Load (UPSERT to avoid duplicates on transaction_id) cur.executemany( """ INSERT INTO public.transformed_transactions ( transaction_id, amount_dollars, created_at, customer_id, status,
processed_at ) VALUES ( %(transaction_id)s, %(amount_dollars)s, %(created_at)s, %(customer_id)s, %(status)s, NOW() ) ON CONFLICT (transaction_id) DO UPDATE SET amount_dollars = EXCLUDED.amount_dollars, created_at = EXCLUDED.created_at, customer_id = EXCLUDED.customer_id, status = EXCLUDED.status, processed_at = NOW() """, transformed, ) # If we got here, everything in this block is a single transaction conn.commit() logger.info(f"Wrote {len(transformed)} rows to public.transformed_transactions") return len(transformed) except Exception as exc: conn.rollback() logger.error(f"Transaction rolled back due to error: {exc}") raise @flow(name="pg-transform-flow") def pg_flow(): return transform_in_single_transaction() if name == "__main__": pg_flow()
Copy code
prefect.yaml
name: pg-project prefect-version: ">=3.0.0" # Define one or more deployments deployments: - name: pg-transform description: "Extract from transactions, transform, and upsert into transformed_transactions in a single DB transaction." entrypoint: pg.py:pg_flow tags: ["postgres", "etl"] parameters: {} # add runtime flow params here if needed # The work pool your worker will poll. Replace with your actual pool name. work_pool: name: your-process-pool # Optional job variables per infrastructure type # For a Process work pool you can pass env vars like this: job_variables: env: PG_CONNINFO: ${PG_CONNINFO}
Copy code
Optional: requirements.txt
prefect>=3.0.0 psycopg[binary]>=3.1
Copy code
Optional: .env (example)
PG_CONNINFO=postgresql://postgres:postgres@localhost:5432/postgres
Copy code
How to deploy and run
- From your project root (where prefect.yaml lives), create a work pool (if you don’t have one yet). For a local process worker:
  - Create the pool (one-time):
prefect work-pool create "your-process-pool" --type process
Copy code
- Start a worker to poll that pool:
prefect worker start --pool your-process-pool
Copy code
- Register the deployment defined in prefect.yaml:
prefect deploy
Copy code
- Run it from the UI or kick off from CLI:
prefect deployment run "pg-transform" ``` Notes - This example does all DB work in one task to guarantee a single transaction. If you split extract/transform/load into multiple tasks, each task would use its own connection/transaction by default. - Adjust table/column names, types, and the transformation logic to your schema.
k
can you use SQLALCHEMY for the same purpose @Marvin
m
thought for 81.1 seconds
Absolutely—here’s the same flow using SQLAlchemy (2.x), keeping everything in a single database transaction. pg.py ``` import os from decimal import Decimal from typing import Any, Dict, List from prefect import flow, task, get_run_logger from sqlalchemy import ( BigInteger, Column, MetaData, Numeric, Table, Text, TIMESTAMP, func, text, create_engine, ) from sqlalchemy.dialects.postgresql import insert as pg_insert def _sqlalchemy_url_from_env() -> str: """ Returns a SQLAlchemy URL. Preferred: - PG_SQLA_URL with 'postgresql+psycopg://...' Fallbacks: - PG_CONNINFO (upgrades 'postgresql://' to 'postgresql+psycopg://') - individual PG* env vars (builds 'postgresql+psycopg://...') """ url = os.getenv("PG_SQLA_URL") if url: return url conninfo = os.getenv("PG_CONNINFO") if conninfo: # Upgrade to psycopg3 driver if not explicitly provided if conninfo.startswith("postgresql://"): return conninfo.replace("postgresql://", "postgresql+psycopg://", 1) return conninfo host = os.getenv("PGHOST", "localhost") port = os.getenv("PGPORT", "5432") user = os.getenv("PGUSER", "postgres") password = os.getenv("PGPASSWORD", "") dbname = os.getenv("PGDATABASE", "postgres") return f"postgresql+psycopg//{user}{password}@{host}:{port}/{dbname}" @task def transform_in_single_transaction() -> int: """ Extracts from public.transactions, transforms, and upserts into public.transformed_transactions in ONE DB transaction. """ logger = get_run_logger() url = _sqlalchemy_url_from_env() engine = create_engine(url, pool_pre_ping=True, future=True) metadata = MetaData() transformed_transactions = Table( "transformed_transactions", metadata, Column("transaction_id", BigInteger, primary_key=True), Column("amount_dollars", Numeric(18, 2), nullable=False), Column("created_at", TIMESTAMP(timezone=True), nullable=False), Column("customer_id", BigInteger), Column("status", Text, nullable=False), Column("processed_at", TIMESTAMP(timezone=True), server_default=func.now(), nullable=False), schema="public", ) with engine.begin() as conn: # Ensure the target table exists (DDL in the same transaction) metadata.create_all(bind=conn, tables=[transformed_transactions]) # Extract result = conn.execute( text( """ SELECT id, amount_cents, created_at, customer_id, status FROM public.transactions """ ) ) rows = result.mappings().all() logger.info(f"Fetched {len(rows)} rows from public.transactions") # Transform (cents -> dollars as Decimal; normalize status) transformed: List[Dict[str, Any]] = [] for r in rows: cents = r.get("amount_cents") amount_dollars = (Decimal(cents) if cents is not None else Decimal(0)) / Decimal(100) status = (r.get("status") or "").strip().upper() transformed.append( { "transaction_id": r["id"], "amount_dollars": amount_dollars, "created_at": r["created_at"], "customer_id": r.get("customer_id"), "status": status, # processed_at uses server default on insert } ) if not transformed: logger.info("No rows to write.") return 0 # Load with PostgreSQL upsert insert_stmt = pg_insert(transformed_transactions).values(transformed) upsert_stmt = insert_stmt.on_conflict_do_update( index_elements=[transformed_transactions.c.transaction_id],
set_={ "amount_dollars": insert_stmt.excluded.amount_dollars, "created_at": insert_stmt.excluded.created_at, "customer_id": insert_stmt.excluded.customer_id, "status": insert_stmt.excluded.status, "processed_at": func.now(), }, ) conn.execute(upsert_stmt) logger.info(f"Wrote {len(transformed)} rows to public.transformed_transactions") return len(transformed) @flow(name="pg-transform-flow") def pg_flow(): return transform_in_single_transaction() if name == "__main__": pg_flow()
Copy code
prefect.yaml
name: pg-project prefect-version: ">=3.0.0" deployments: - name: pg-transform description: "SQLAlchemy ETL: transactions -> transformed_transactions in a single transaction." entrypoint: pg.py:pg_flow tags: ["postgres", "etl", "sqlalchemy"] parameters: {} work_pool: name: your-process-pool job_variables: env: # Prefer PG_SQLA_URL (SQLAlchemy URL). Example: # postgresql+psycopg://postgres:postgres@localhost:5432/postgres PG_SQLA_URL: ${PG_SQLA_URL} # Or use PG_CONNINFO and it will be upgraded to +psycopg automatically: # PG_CONNINFO: ${PG_CONNINFO}
Copy code
requirements.txt
prefect>=3.0.0 sqlalchemy>=2.0 psycopg[binary]>=3.1
Copy code
Example .env (for local dev)
PG_SQLA_URL=postgresql+psycopg://postgres:postgres@localhost:5432/postgres ``` Notes - All DB work is inside a single engine.begin() transaction; success commits, errors roll back. - Upsert uses PostgreSQL ON CONFLICT by way of SQLAlchemy’s dialect insert. Adjust column names/types to your schema.