Hi <@ULVA73B9P>! I have the following case: I have...
# marvin-ai
t
Hi @Marvin! I have the following case: I have multiple search tasks (calls to an external API) and each search returns list of records. For each list, I need to post-process the data further - probably in a separate task asynchronously. What is the best approach to do this?
m
thought for 528.8 seconds
Great question — this is a classic fan-out/fan-in pattern in Prefect 3.x. Here are the recommended approaches, depending on whether you want to start processing as soon as each search finishes (streaming) or wait until all searches are done (batch). Assumptions: Prefect 3.x; your API calls are I/O bound. Core building blocks - Use task mapping for fan-out:
search_api.map(queries)
and
process_record.map(records)
. - Use
as_completed
to start post-processing as soon as each search returns. - Control concurrency with a task runner (threads for I/O) and optionally global concurrency limits/rate limits for APIs. - Add retries with backoff on the API calls. Pattern A — Streamed: begin processing each list as soon as it’s ready This minimizes end-to-end latency and doesn’t wait for all searches to finish.
Copy code
from prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner
from prefect.futures import as_completed
from prefect.tasks import exponential_backoff
from prefect.concurrency.sync import rate_limit

@task(retries=3, retry_delay_seconds=exponential_backoff(2), log_prints=True)
def search_api(query: str) -> list[dict]:
    # Optional: enforce API rate limits globally (see CLI below)
    with rate_limit("my-api", timeout_seconds=30, strict=False):
        # call external API here
        # return a list of records
        ...

@task
def process_record(rec: dict) -> dict:
    # post-process a single record
    ...

@flow(task_runner=ThreadPoolTaskRunner(max_workers=20))
def pipeline(queries: list[str]):
    # Fan-out searches concurrently
    search_futures = search_api.map(queries)

    processed = []
    # As each search finishes, map the processing over its returned list
    for f in as_completed(search_futures):
        records = f.result()   # this is the list from that search
        processed.extend(process_record.map(records))

    # Optionally wait for all processing to complete and return results
    return [p.result() for p in processed]
Pattern B — Batch: wait for all searches, then process all records This is simpler but waits for the entire first stage to finish.
Copy code
from prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner
from prefect.tasks import exponential_backoff
from prefect.concurrency.sync import rate_limit

@task(retries=3, retry_delay_seconds=exponential_backoff(2))
def search_api(query: str) -> list[dict]:
    with rate_limit("my-api", timeout_seconds=30, strict=False):
        ...

@task
def process_record(rec: dict) -> dict:
    ...

@flow(task_runner=ThreadPoolTaskRunner(max_workers=20))
def pipeline(queries: list[str]):
    lists = search_api.map(queries).result()  # resolve to list-of-lists
    flat_records = [rec for lst in lists for rec in lst]
    processed = process_record.map(flat_records)
    return [p.result() for p in processed]
Concurrency and rate limiting - Task runner: For I/O-bound HTTP calls, threads are ideal. Control concurrency with
max_workers
on
ThreadPoolTaskRunner
. For CPU-heavy post-processing, consider
ProcessPoolTaskRunner
or Dask if you need to scale out. - Global Concurrency Limits and Rate Limits: Use these to respect third-party API limits across all flows. - Create a limit: - List commands:
prefect gcl --help
- Create:
prefect gcl create my-api --limit 10 --slot-decay-per-second 10
- Use in tasks:
Copy code
from prefect.concurrency.sync import rate_limit, concurrency

    # Rate limit (token bucket style)
    with rate_limit("my-api", timeout_seconds=30, strict=False):
        ...

    # Or a simple slot-based concurrency cap
    with concurrency("my-api-requests", occupy=1, timeout_seconds=30, strict=False):
        ...
- CLI help verified: - Global concurrency limits docs -
prefect gcl create --help
confirmed via CLI Reliability tweaks - Retries with backoff on network calls:
-
@task(retries=3, retry_delay_seconds=exponential_backoff(2))
- Docs: Retries and backoff - If individual records can fail but you want the flow to continue, you can wrap them with
allow_failure
when mapping. - If you need to observe futures as they complete:
as_completed
is great;
wait
is useful to partition into done/not-done if you need timeouts. When to pick which pattern - Use Pattern A (streamed) if you want to minimize latency and start processing as soon as possible. - Use Pattern B (batch) if simplicity is more important and waiting for all searches first is okay. Helpful docs - Run work concurrently: https://docs-3.prefect.io/v3/how-to-guides/workflows/run-work-concurrently - Futures API: https://docs-3.prefect.io/v3/api-ref/python/prefect-futures - Dask integration (if you need distributed): https://docs-3.prefect.io/integrations/prefect-dask/index A couple clarifying questions to recommend the exact pattern: - Do you want processing to start as soon as each search finishes, or is batching fine? - Roughly how many queries and records are we talking about (to size concurrency and decide if chunking is needed)? - Any specific API rate limits we should enforce (e.g., X requests per second)?
t
@Marvin Can you elaborate on the
as_completed
method.
m
thought for 235.3 seconds
Happy to! Here’s a practical deep-dive on
prefect.futures.as_completed
in Prefect 3.x. What it does - It’s a generator that yields task futures in the order they finish, not the order you submitted them. - Use it to start processing results as soon as they’re ready (great for streaming fan-out/fan-in). Key behavior - Order: Completion order, not submission order. - Timeouts: If you pass
timeout=...
, and no additional futures complete within that window, it raises
TimeoutError
. Futures that completed before the timeout are already yielded. - Failures: It yields futures regardless of state (Completed, Failed, Cancelled). Calling
future.result()
on a failed future raises unless you pass
raise_on_failure=False
. - Multiple uses: You can call
as_completed
on the same set of futures multiple times; each call creates a new generator. Completed futures will be yielded immediately. - Duplicates: If the same future appears more than once in the input list, it’s deduplicated internally. - Early exit: You can break out of the loop anytime. It doesn’t affect the running tasks. Signature
Copy code
from prefect.futures import as_completed

for f in as_completed(futures, timeout=None):
    ...
Common patterns 1) Stream results as they finish (handle success/failure)
Copy code
from prefect.futures import as_completed

futures = search_api.map(queries)  # returns a PrefectFutureList

for fut in as_completed(futures):
    try:
        result = fut.result()  # may raise if task failed
    except Exception as exc:
        # Handle a failed or cancelled task
        print(f"Search failed: {exc}")
        continue

    # Process the successful result
    handle(result)
2) With timeout and partial progress
Copy code
from prefect.futures import as_completed
from concurrent.futures import TimeoutError  # Python’s TimeoutError

processed = 0

try:
    for fut in as_completed(futures, timeout=10):
        value = fut.result()
        handle(value)
        processed += 1
except TimeoutError:
    print(f"Timed out after processing {processed} results.")
3) Streaming a second mapping based on each completed future This is how you start processing each returned list as soon as it’s ready:
Copy code
from prefect import flow, task
from prefect.futures import as_completed

@task
def search_api(q) -> list[dict]:
    ...

@task
def process_record(rec: dict):
    ...

@flow
def pipeline(queries: list[str]):
    search_futures = search_api.map(queries)

    processed_futures = []
    for fut in as_completed(search_futures):
        records = fut.result()  # the list from that specific search
        processed_futures.extend(process_record.map(records))

    # Optionally wait for all processing to finish
    return [pf.result() for pf in processed_futures]
When to choose as_completed vs wait() - Use
as_completed
when you want a streaming pipeline (react to work finishing). - Use
wait(futures)
when you want a snapshot: split into done/not_done at a moment in time, then decide what to do next. Docs and references - API reference (futures): prefect.futures - Concurrency patterns: Run work concurrently If you share: - about how many futures (tasks) you expect, - whether you want to stop on first error or keep going, - and if you need a global timeout for the whole stage, I can tailor an example that matches your exact use case.
t
@Marvin What is streaming fan-out/fan-in?
m
thought for 18.0 seconds
Streaming fan-out/fan-in is a workflow pattern where you: - Fan-out: launch many independent tasks in parallel (e.g., many API searches). - Streamed fan-in: as each upstream task finishes, immediately begin its downstream work (e.g., process each returned list), instead of waiting for all upstream tasks to finish. Why it’s useful - Lower end-to-end latency: you start processing early results right away. - Better throughput/backpressure: you don’t build a huge intermediate buffer in memory. - Resilient to long tails: slow searches don’t block processing of fast ones. In Prefect 3.x, you implement this with `as_completed`:
Copy code
from prefect import flow, task
from prefect.futures import as_completed
from prefect.task_runners import ThreadPoolTaskRunner

@task
def search_api(query: str) -> list[dict]:
    # call external API
    ...

@task
def process_record(rec: dict) -> dict:
    # post-process one record
    ...

@flow(task_runner=ThreadPoolTaskRunner(max_workers=20))
def pipeline(queries: list[str]):
    # Fan-out: run all searches concurrently
    search_futs = search_api.map(queries)

    processed_futs = []

    # Streaming fan-in: as each search completes, immediately process its records
    for fut in as_completed(search_futs):
        records = fut.result()  # this search’s list
        processed_futs.extend(process_record.map(records))

    # Optionally wait for all processing to finish (true fan-in)
    return [pf.result() for pf in processed_futs]
Notes -
as_completed
yields futures in completion order, not submission order. - You can combine this with rate limits and retries on the API task. - If you want to persist or aggregate results incrementally (to a DB, S3, etc.), call that in the loop so slow searches don’t block fast ones. Good references - Concurrent execution patterns: Run work concurrently - Futures API: prefect.futures If you share your expected number of searches/records and any API rate limits, I can sketch a tuned version with proper concurrency caps and backoff.
t
@Marvin As postprocessing of a single record includes writing in the DB, I would like to execute these writes over a whole batch. What processing strategy would you recommend?
m
thought for 456.7 seconds
Great question. In Prefect 3.x, the most common pattern is to keep record-level processing parallelized, then “reduce” all results into a single batch write task. That gives you the best of both worlds: fast parallel processing and one atomic DB write. Recommended strategies (pick based on batch size and failure semantics): - Map → Reduce to one batch write (simple, one transaction) - Map a pure “process_record” task over all records - Pass the mapped futures directly to a single downstream “write_batch” task - Prefect will resolve those futures into a list for you - Chunked processing → Single batch write (memory-friendly) - Chunk input first, map a “process_chunk” task, then send the list-of-lists to one “write_batch” task and flatten there - Chunked processing → Chunked writes (for very large batches or tight timeouts) - Process and write by chunk in separate tasks; use DB transactions per chunk and accept partial commits Example 1 — Map then reduce to a single batch write
Copy code
from prefect import flow, task, unmapped, tags
from prefect.tasks import exponential_backoff

@task
def process_record(rec: dict) -> dict:
    # pure transform, no DB IO
    # ... do CPU/light IO work ...
    return {"id": rec["id"], "value": rec["value"] * 2}

@task(
    retries=3,
    retry_delay_seconds=exponential_backoff(2),
)
def write_batch(processed: list[dict]):
    # Use your DB client (SQLAlchemy example shown)
    from sqlalchemy import create_engine, text
    engine = create_engine("<postgresql+psycopg2://user:pass@host:5432/db>")

    # single transaction for the whole batch write
    with engine.begin() as conn:
        # Example bulk upsert — adapt to your dialect
        stmt = text("""
            INSERT INTO my_table (id, value)
            VALUES (:id, :value)
            ON CONFLICT (id) DO UPDATE SET value = EXCLUDED.value
        """)
        conn.execute(stmt, processed)

@flow
def process_and_write(records: list[dict]):
    processed_futures = process_record.map(records)

    # Optional: limit concurrency on the write using a tag
    with tags("db_write"):
        write_batch(processed_futures)

# process_and_write(records)  # call with your batch
Example 2 — Chunk first to control memory, then single batch write
Copy code
from prefect import flow, task, tags
from prefect.utilities.collections import batched_iterable
from prefect.tasks import exponential_backoff

@task
def process_chunk(chunk: list[dict]) -> list[dict]:
    return [{"id": r["id"], "value": r["value"] * 2} for r in chunk]

@task(
    retries=3,
    retry_delay_seconds=exponential_backoff(2),
)
def write_batch(all_processed: list[list[dict]]):
    # flatten
    flat = [item for sub in all_processed for item in sub]
    # write once, as above...
    # with engine.begin() as conn: conn.execute(..., flat)

@flow
def process_and_write_chunked(records: list[dict], batch_size: int = 5_000):
    chunks = [list(c) for c in batched_iterable(records, batch_size)]
    processed_per_chunk = process_chunk.map(chunks)
    with tags("db_write"):
        write_batch(processed_per_chunk)
Operational tips - Concurrency limits: If multiple flow runs may hit your DB, use tag-based concurrency limits on the “db_write” tag to cap concurrent batch writes. See Tag-based concurrency limits and Global concurrency limits. - Retries and backoff: Configure retries on the batch write task for transient DB errors. Docs: Retries.
- Mapping/reduce pattern: Prefect will automatically resolve mapped futures passed to a downstream task, enabling elegant “reduce” patterns. See Run work concurrently and this overview of mapping at scale: Task mapping at scale. A couple of clarifying questions to tailor this further: - Which database are you targeting (Postgres, Snowflake, BigQuery, etc.)? - Are you doing inserts only or upserts/merges? - How large can a “batch” get and do you have memory constraints? - Do you need a single transaction across the whole batch, or is per-chunk commit acceptable? With those details, I can share a dialed-in example (including SQL and driver-specific bulk operations).
t
@Marvin Processing of initial records ends up in enhanced records + multiple other corresponding objects. I need an approach including writing in many DB tables all of the resulting objects.
m
thought for 118.9 seconds
Got it — you’re turning one “initial” record into a bundle of related objects that must be persisted across multiple tables. The key is to keep record-level processing parallelized, then perform a single, centralized, multi-table write in a single database transaction to preserve referential integrity. Recommended strategy in Prefect 3.x - Map for processing, reduce for writing: - Map a pure processing task over the input records; return a structured payload that contains rows for each target table. - Pass the list of mapped results to a single downstream write task. Prefect will resolve the futures and pass you a concrete list. - Inside the write task, open one DB transaction and write all tables in the correct order. This is the only way to guarantee ACID across multiple tables in Prefect (transactions cannot span multiple tasks). - Chunk if needed: - For large inputs, chunk first (e.g., 5k–50k records per chunk), map a process_chunk task, then reduce to one write task that flattens and writes everything. Or, if volumes are huge, process/write per chunk with one transaction per chunk. - Use deterministic keys: - If possible, generate stable IDs (e.g., UUIDs) during processing for parent/child rows. This avoids “insert-then-fetch-ID” loops and simplifies child inserts. - If you must use DB-generated IDs, do an INSERT ... RETURNING for parents, build a mapping, then insert children. - Retries and concurrency: - Put retries with exponential backoff on the write task. - Use tag-based concurrency limits on your write task (e.g., “db_write”) to protect the database from overload if multiple flows run concurrently. - Docs: Retries, Tag-based concurrency limits, Run work concurrently. Example: multi-table write with a single transaction (Postgres + SQLAlchemy Core) ``` from typing import Dict, List from prefect import flow, task, tags from prefect.tasks import exponential_backoff # Shape returned by processing: rows for each table # { # "parents": [{"id": "...", "name": ...}, ...], # "children_a": [{"id": "...", "parent_id": "...", ...}, ...], # "children_b": [{"id": "...", "parent_id": "...", ...}, ...], # } @task def process_record(rec: dict) -> Dict[str, List[dict]]: # Derive deterministic IDs (e.g., UUID5 from natural keys) to avoid round-trips parent_id = rec["id"] # or your own uuid generation parent_row = { "id": parent_id, "name": rec["name"].strip(), } children_a = [ {"id": f"{parent_id}-A-{i}", "parent_id": parent_id, "attr": v} for i, v in enumerate(rec.get("attrs_a", [])) ] children_b = [ {"id": f"{parent_id}-B-{i}", "parent_id": parent_id, "attr": v} for i, v in enumerate(rec.get("attrs_b", [])) ] return {"parents": [parent_row], "children_a": children_a, "children_b": children_b} @task def coalesce(bundles: List[Dict[str, List[dict]]]) -> Dict[str, List[dict]]: # Flatten and deduplicate parents by id parents, children_a, children_b = [], [], [] for b in bundles: parents.extend(b["parents"]) children_a.extend(b["children_a"]) children_b.extend(b["children_b"]) # Deduplicate parent rows by id (last-write-wins or merge fields as you need) seen = {} for p in parents: seen[p["id"]] = p parents = list(seen.values()) return {"parents": parents, "children_a": children_a, "children_b": children_b} @task( retries=3, retry_delay_seconds=exponential_backoff(2), ) def write_all(tables: Dict[str, List[dict]]): from sqlalchemy import create_engine, text engine = create_engine("postgresql+psycopg2://user:pass@host:5432/db") parents = tables["parents"] children_a = tables["children_a"]
children_b = tables["children_b"] # Do everything atomically with engine.begin() as conn: # Upsert parents first if parents: conn.execute( text(""" INSERT INTO parents (id, name) VALUES (:id, :name) ON CONFLICT (id) DO UPDATE SET name = EXCLUDED.name """), parents, ) # Then children A if children_a: conn.execute( text(""" INSERT INTO children_a (id, parent_id, attr) VALUES (:id, :parent_id, :attr) ON CONFLICT (id) DO UPDATE SET attr = EXCLUDED.attr """), children_a, ) # Then children B if children_b: conn.execute( text(""" INSERT INTO children_b (id, parent_id, attr) VALUES (:id, :parent_id, :attr) ON CONFLICT (id) DO UPDATE SET attr = EXCLUDED.attr """), children_b, ) @flow def pipeline(records: List[dict], chunk_size: int | None = None): if chunk_size: # Optional: chunk first if records are large from prefect.utilities.collections import batched_iterable chunks = [list(c) for c in batched_iterable(records, chunk_size)] processed_per_chunk = process_record.map([r for chunk in chunks for r in chunk]) # map flat tables = coalesce(processed_per_chunk) else: processed = process_record.map(records) tables = coalesce(processed) # Limit concurrency on DB writes if multiple runs may overlap with tags("db_write"): write_all(tables) ``` Notes and options - If you must use DB-generated surrogate IDs: - Insert parents with INSERT ... RETURNING (or SQLAlchemy session.add_all + flush) to collect a mapping from natural key → id, then populate child rows with those IDs before inserting. - Keep this inside the same engine.begin() context. - Data warehouses (Snowflake/BigQuery/Redshift): - Prefer staging + MERGE. E.g., write all processed outputs to staging tables/files (CSV/Parquet), then run MERGE statements for each target table in a single task (and transaction where supported). This is often much faster and more scalable than row-by-row upserts. - Don’t split one multi-table transaction across multiple tasks: - Prefect tasks are separate units of work; you cannot span a single DB transaction across tasks. For all-or-nothing multi-table semantics, keep it in one write task. - Concurrency/rate limiting and resiliency: - Add tag-based concurrency limits to “db_write” if multiple deployments run concurrently. - Use targeted retries (e.g., only retry on transient errors) via a custom retry_condition_fn if you need finer control. - Docs: Retries, Concurrency limits. If you can share: - Which database (Postgres, Snowflake, BigQuery, etc.)? - Whether you can generate deterministic IDs vs. relying on DB-generated IDs? - Expected batch size and latency requirements? I can tailor the write task and SQL (including MERGE patterns) to your stack.
t
@Marvin Explain how to solve this: 121317.224 | WARNING | urllib3.connectionpool - Connection pool is full, discarding connection: 172.19.8.47. Connection pool size: 10
m
thought for 129.0 seconds
That warning comes from urllib3/requests when you’re making more concurrent HTTP requests to the same host than the connection pool is sized for (default is 10). Extra connections get created and then discarded, which is noisy and less efficient. There are three levers to fix it: 1) Reuse a configured Session with a larger pool - Create a single reusable
requests.Session
per process/thread with a bigger pool and pool_block=True (so callers wait instead of oversubscribing). - Make sure you fully consume/close responses so connections return to the pool.
Copy code
import threading
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

_thread_local = threading.local()

def get_session():
    if not hasattr(_thread_local, "session"):
        s = requests.Session()
        retry = Retry(
            total=3,
            backoff_factor=0.5,
            status_forcelist=[429, 500, 502, 503, 504],
            allowed_methods=["GET", "POST", "PUT", "DELETE", "PATCH"],
        )
        adapter = HTTPAdapter(
            pool_connections=100,   # total pools
            pool_maxsize=100,       # per-host concurrent keep-alive connections
            max_retries=retry,
            pool_block=True,        # block instead of creating/discarding extras
        )
        s.mount("http://", adapter)
        s.mount("https://", adapter)
        _thread_local.session = s
    return _thread_local.session

def http_get(url, **kwargs):
    session = get_session()
    # Ensure the response is closed or fully read so the connection is returned to the pool
    with session.get(url, timeout=30, **kwargs) as r:
        r.raise_for_status()
        return r.json()
Notes: - Don’t create a new Session per request; reuse the configured one. - If you use
stream=True
, read to EOF and close, or the connection won’t be returned to the pool. 2) Reduce concurrency from Prefect - If lots of tasks call the same host concurrently, cap them with a tag-based concurrency limit so you don’t exceed your pool. - CLI (verified):
Copy code
prefect concurrency-limit create http_outbound 20
- Then tag the tasks that make HTTP calls:
Copy code
from prefect import task, tags

@task
def fetch(url):
    return http_get(url)

# when calling the task
with tags("http_outbound"):
    fetch.map(urls)
- Docs: Tag-based concurrency limits and Global concurrency limits. You can also limit local parallelism:
Copy code
from prefect import flow
from prefect.task_runners import ThreadPoolTaskRunner

@flow(task_runner=ThreadPoolTaskRunner(max_workers=20))
def my_flow(...):
    ...
3) Match pool sizing to reality - If your upstream or networking stack limits connections (e.g., 50 per client), set
pool_maxsize
and Prefect concurrency to stay within that. - If you’re running in multiple pods/workers, remember the limit applies per process; multiply accordingly. If you’re using an async client instead of requests: - httpx: configure limits with
httpx.Limits(max_keepalive_connections=..., max_connections=...)
and reuse a single Client. - aiohttp: reuse a
ClientSession
with a
TCPConnector(limit=..., limit_per_host=...)
. If you share a bit about how you’re calling that 172.19.8.47 endpoint (requests vs httpx/aiohttp, per-task or shared session), I can tailor the exact code pattern.
t
@Marvin I need to perform batch insert of list of records but I use DB-generated IDs to reference them in the children objects. How to perform it in Prefect on a result from a mapped task over a list of objects.
m
thought for 58.3 seconds
You can do this reliably with a “map → reduce → single transaction” pattern: - Map: process each input record into a bundle with a parent payload and child payloads that reference the parent by a stable client key (e.g., external_id), not the DB id. - Reduce: collect all bundles into a single structure. - Single write task: in one DB transaction, insert/upsert parents with INSERT … RETURNING (or ORM flush) to get DB-generated IDs, build a key→id map, then insert children using the mapped parent ids. Key point: you cannot span a database transaction across multiple Prefect tasks. Do the multi-table write in a single task to keep it atomic. Example (Postgres + SQLAlchemy Core; mapped results feed one write task) ``` from typing import Dict, List from prefect import flow, task, unmapped, tags from prefect.tasks import exponential_backoff # Mapped task: return parent/children, children reference parent by a client key @task def process_record(rec: dict) -> Dict[str, List[dict]]: parent = { "external_id": rec["external_id"], # unique natural key for upsert/id map "name": rec["name"], } children_a = [ {"parent_external_id": rec["external_id"], "attr": v} for v in rec.get("attrs_a", []) ] children_b = [ {"parent_external_id": rec["external_id"], "attr": v} for v in rec.get("attrs_b", []) ] return {"parents": [parent], "children_a": children_a, "children_b": children_b} @task def coalesce(bundles: List[Dict[str, List[dict]]]) -> Dict[str, List[dict]]: # Flatten and deduplicate parents by external_id parents, children_a, children_b = [], [], [] for b in bundles: parents.extend(b["parents"]) children_a.extend(b["children_a"]) children_b.extend(b["children_b"]) dedup = {} for p in parents: dedup[p["external_id"]] = p return {"parents": list(dedup.values()), "children_a": children_a, "children_b": children_b} @task( retries=3, retry_delay_seconds=exponential_backoff(2), ) def write_all(tables: Dict[str, List[dict]]): # Define your SQLAlchemy tables elsewhere and import them here. # Below shows raw SQL for clarity. from sqlalchemy import create_engine, text engine = create_engine("postgresql+psycopg2://user:pass@host:5432/db") parents = tables["parents"] children_a = tables["children_a"] children_b = tables["children_b"] if not parents and not children_a and not children_b: return with engine.begin() as conn: # 1) Upsert parents and RETURNING (id, external_id) to build id map id_map = {} if parents: res = conn.execute( text(""" INSERT INTO parents (external_id, name) VALUES (:external_id, :name) ON CONFLICT (external_id) DO UPDATE SET name = EXCLUDED.name RETURNING id, external_id """), parents, ) rows = res.fetchall() id_map = {row.external_id: row.id for row in rows} # 2) Prepare children with db-generated parent_id def attach_parent_id(child_rows): out = [] for r in child_rows: pid = id_map.get(r["parent_external_id"]) if pid is None: # Defensive: if upsert didn’t return, fetch mapping (rare for PG), # or raise. Usually not needed for Postgres. raise ValueError(f"Missing parent id for {r['parent_external_id']}") nr = dict(r) nr["parent_id"] = pid nr.pop("parent_external_id", None) out.append(nr) return out ins_children_a = attach_parent_id(children_a) if children_a else [] ins_children_b = attach_parent_id(children_b) if children_b else [] # 3) Insert/upsert children if ins_children_a:
conn.execute( text(""" INSERT INTO children_a (parent_id, attr) VALUES (:parent_id, :attr) ON CONFLICT DO NOTHING """), ins_children_a, ) if ins_children_b: conn.execute( text(""" INSERT INTO children_b (parent_id, attr) VALUES (:parent_id, :attr) ON CONFLICT DO NOTHING """), ins_children_b, ) @flow def pipeline(records: List[dict]): processed = process_record.map(records) tables = coalesce(processed) # Optional: tag to cap concurrent writes across runs with tags("db_write"): write_all(tables) ``` Notes and alternatives - ORM approach: add all Parent objects, session.flush() to populate DB-generated ids, build the map in-memory, then create/save Child objects. Keep everything within a single session/transaction. - MySQL or warehouses without robust RETURNING: - Upsert parents (ON DUPLICATE KEY UPDATE), then SELECT id, external_id WHERE external_id IN (...) to build the map, then insert children. Keep it in one transaction. - Idempotency: ensure parents have a unique key (e.g., external_id) and use upserts so retries don’t duplicate data. - Large batches: chunk parent upserts (e.g., 5k–50k rows) then fetch mapping for that chunk and insert its children before moving to the next chunk, all within the same task. Prefect specifics - Passing mapped futures directly into `coalesce`/`write_all` is supported; Prefect resolves them into lists for you. - Use retries on the write task and tag-based concurrency limits if multiple deployments hit the DB at once. - Docs: Retries, Concurrency limits, Run work concurrently. If you share your database (Postgres/MySQL/Snowflake/etc.) and whether you’re using Core or ORM, I can tailor the insert/upsert and RETURNING/SELECT mapping code precisely.