Kartik
10/24/2025, 8:32 AMMarvin
10/24/2025, 8:32 AMMarvin
10/24/2025, 8:40 AMtask.fn) so they run in the same thread and share the same transaction.
Below is a refactor that preserves your intent and guarantees a rollback on any error.
Key changes:
- Open the DB connection and begin the transaction inside a single transactional task.
- Call your “process_and_save_…” helpers as plain functions or with .fn if you keep them decorated with @task.
- Pass the same connection to pd.read_sql and to_sql so they participate in the same transaction.
- Don’t create the connection or start the transaction in the flow; create/use them in the task’s thread.
Example
```
from prefect import flow, task, get_run_logger
# from sqlalchemy import text # if you're on SQLAlchemy 2.0, use text() in read_sql
# If your helpers are currently @task, either:
# - remove @task so they are plain functions, OR
# - keep @task but call them with .fn(...) inside the transactional task
# Plain functions (recommended) -----------------------------------------------
def process_and_save_transaction_metadata(transaction, conn):
# ... compute transaction_metadata
# pd.DataFrame(transaction_metadata).to_sql(..., con=conn, schema="marketplaces", if_exists="append", index=False)
return
def process_and_save_transaction_contexts(transaction, conn):
# ...
return
def process_and_save_transaction_breakdowns(transaction, conn):
# ...
# pd.DataFrame(transaction_breakdowns).to_sql(..., con=conn, schema="marketplaces", if_exists="append", index=False)
return
def process_and_save_transaction_items_breakdowns(transaction, conn):
# ...
return
@task(log_prints=True)
def run_transactional_breakdown(database: str, store_timezone_offset_minutes: str = "330"):
"""
Owns the connection and the transaction so all work is atomic.
"""
# IMPORTANT: open the connection inside the task (not in the flow)
conn = get_conn() # should return a SQLAlchemy Connection (or Engine; see notes below)
trans = conn.begin() # start a DB transaction
try:
# For SQLAlchemy 2.x, consider: pd.read_sql(text("select ..."), conn)
transactions = pd.read_sql("select * from marketplaces.amazon_raw_transactions", conn)
for _, transaction in transactions.iterrows():
# Call helpers as plain functions so they run in this same thread/transaction
process_and_save_transaction_metadata(transaction, conn)
process_and_save_transaction_contexts(transaction, conn)
process_and_save_transaction_breakdowns(transaction, conn)
process_and_save_transaction_items_breakdowns(transaction, conn)
trans.commit()
print("✅ Transformation complete. Data saved to 'marketplaces' schema.")
except Exception as e:
trans.rollback()
# Prefect will mark the task as failed; re-raise for visibility
raise
finally:
conn.close()
print("Connection closed")
@flow(log_prints=True)
def transform_amazon_transactions(database: str, store_timezone_offset_minutes: str = "330"):
logger = get_run_logger()
logger.info(f"Loading Environment Variables for Database: {database}")
load_env_vars_dynamic(
database=database,
db_host="xyz.rds.amazonaws.com" # explicit dev host while data is only in dev
)
logger.info(f"Environment is Ready For Database: {database}")
logger.info(f"Running Flow For Database: {database}")
# Run everything that must be atomic in a single transactional taskMarvin
10/24/2025, 8:40 AMIf you prefer to keep the four “process_and_save_…” as Prefect tasks, you must call them with `.fn(...)` inside the transactional task so they execute as plain functions (no orchestration), in the same thread and transaction:
process_and_save_transaction_breakdowns.fn(transaction, conn)
```
Notes and gotchas
- Do not pass a SQLAlchemy Connection created in the flow into tasks. The flow runs in the main thread, but tasks run in worker threads; SQLAlchemy connections are not thread-safe and cannot cross threads. Always open the connection inside the task that uses it.
- Prefer a context manager to reduce boilerplate:
- If get_conn() returns an Engine: with engine.begin() as conn: ... commits on success, rolls back on exception.
- If it returns a Connection: with conn.begin(): ... is equivalent.
- Make sure pd.to_sql(..., con=conn, ...) uses the same conn from the active transaction.
- If you truly need separate Prefect task runs for each step (for retries, mapping, etc.), you cannot reliably share a single DB transaction across them. In that case, consider:
- Making each task idempotent and handling compensation on failure, or
- Moving the multi-step transaction logic into a stored procedure and calling it from a single task.
If you share what get_conn() returns (Engine vs Connection, SQLAlchemy 1.4 vs 2.x), I can tailor the `begin()`/context-manager version precisely and update the pd.read_sql call accordingly.