Tanya Hristova
10/31/2025, 10:17 AMMarvin
10/31/2025, 10:17 AMMarvin
10/31/2025, 10:26 AMsearch_api.map(queries) and process_record.map(records).
- Use as_completed to start post-processing as soon as each search returns.
- Control concurrency with a task runner (threads for I/O) and optionally global concurrency limits/rate limits for APIs.
- Add retries with backoff on the API calls.
Pattern A — Streamed: begin processing each list as soon as it’s ready
This minimizes end-to-end latency and doesn’t wait for all searches to finish.
from prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner
from prefect.futures import as_completed
from prefect.tasks import exponential_backoff
from prefect.concurrency.sync import rate_limit
@task(retries=3, retry_delay_seconds=exponential_backoff(2), log_prints=True)
def search_api(query: str) -> list[dict]:
# Optional: enforce API rate limits globally (see CLI below)
with rate_limit("my-api", timeout_seconds=30, strict=False):
# call external API here
# return a list of records
...
@task
def process_record(rec: dict) -> dict:
# post-process a single record
...
@flow(task_runner=ThreadPoolTaskRunner(max_workers=20))
def pipeline(queries: list[str]):
# Fan-out searches concurrently
search_futures = search_api.map(queries)
processed = []
# As each search finishes, map the processing over its returned list
for f in as_completed(search_futures):
records = f.result() # this is the list from that search
processed.extend(process_record.map(records))
# Optionally wait for all processing to complete and return results
return [p.result() for p in processed]
Pattern B — Batch: wait for all searches, then process all records
This is simpler but waits for the entire first stage to finish.
from prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner
from prefect.tasks import exponential_backoff
from prefect.concurrency.sync import rate_limit
@task(retries=3, retry_delay_seconds=exponential_backoff(2))
def search_api(query: str) -> list[dict]:
with rate_limit("my-api", timeout_seconds=30, strict=False):
...
@task
def process_record(rec: dict) -> dict:
...
@flow(task_runner=ThreadPoolTaskRunner(max_workers=20))
def pipeline(queries: list[str]):
lists = search_api.map(queries).result() # resolve to list-of-lists
flat_records = [rec for lst in lists for rec in lst]
processed = process_record.map(flat_records)
return [p.result() for p in processed]
Concurrency and rate limiting
- Task runner: For I/O-bound HTTP calls, threads are ideal. Control concurrency with max_workers on ThreadPoolTaskRunner. For CPU-heavy post-processing, consider ProcessPoolTaskRunner or Dask if you need to scale out.
- Global Concurrency Limits and Rate Limits: Use these to respect third-party API limits across all flows.
- Create a limit:
- List commands: prefect gcl --help
- Create: prefect gcl create my-api --limit 10 --slot-decay-per-second 10
- Use in tasks:
from prefect.concurrency.sync import rate_limit, concurrency
# Rate limit (token bucket style)
with rate_limit("my-api", timeout_seconds=30, strict=False):
...
# Or a simple slot-based concurrency cap
with concurrency("my-api-requests", occupy=1, timeout_seconds=30, strict=False):
...
- CLI help verified:
- Global concurrency limits docs
- prefect gcl create --help confirmed via CLI
Reliability tweaks
- Retries with backoff on network calls:Marvin
10/31/2025, 10:26 AM@task(retries=3, retry_delay_seconds=exponential_backoff(2))
- Docs: Retries and backoff
- If individual records can fail but you want the flow to continue, you can wrap them with allow_failure when mapping.
- If you need to observe futures as they complete: as_completed is great; wait is useful to partition into done/not-done if you need timeouts.
When to pick which pattern
- Use Pattern A (streamed) if you want to minimize latency and start processing as soon as possible.
- Use Pattern B (batch) if simplicity is more important and waiting for all searches first is okay.
Helpful docs
- Run work concurrently: https://docs-3.prefect.io/v3/how-to-guides/workflows/run-work-concurrently
- Futures API: https://docs-3.prefect.io/v3/api-ref/python/prefect-futures
- Dask integration (if you need distributed): https://docs-3.prefect.io/integrations/prefect-dask/index
A couple clarifying questions to recommend the exact pattern:
- Do you want processing to start as soon as each search finishes, or is batching fine?
- Roughly how many queries and records are we talking about (to size concurrency and decide if chunking is needed)?
- Any specific API rate limits we should enforce (e.g., X requests per second)?Tanya Hristova
10/31/2025, 12:31 PMas_completed method.Marvin
10/31/2025, 12:31 PMMarvin
10/31/2025, 12:35 PMprefect.futures.as_completed in Prefect 3.x.
What it does
- It’s a generator that yields task futures in the order they finish, not the order you submitted them.
- Use it to start processing results as soon as they’re ready (great for streaming fan-out/fan-in).
Key behavior
- Order: Completion order, not submission order.
- Timeouts: If you pass timeout=..., and no additional futures complete within that window, it raises TimeoutError. Futures that completed before the timeout are already yielded.
- Failures: It yields futures regardless of state (Completed, Failed, Cancelled). Calling future.result() on a failed future raises unless you pass raise_on_failure=False.
- Multiple uses: You can call as_completed on the same set of futures multiple times; each call creates a new generator. Completed futures will be yielded immediately.
- Duplicates: If the same future appears more than once in the input list, it’s deduplicated internally.
- Early exit: You can break out of the loop anytime. It doesn’t affect the running tasks.
Signature
from prefect.futures import as_completed
for f in as_completed(futures, timeout=None):
...
Common patterns
1) Stream results as they finish (handle success/failure)
from prefect.futures import as_completed
futures = search_api.map(queries) # returns a PrefectFutureList
for fut in as_completed(futures):
try:
result = fut.result() # may raise if task failed
except Exception as exc:
# Handle a failed or cancelled task
print(f"Search failed: {exc}")
continue
# Process the successful result
handle(result)
2) With timeout and partial progress
from prefect.futures import as_completed
from concurrent.futures import TimeoutError # Python’s TimeoutError
processed = 0
try:
for fut in as_completed(futures, timeout=10):
value = fut.result()
handle(value)
processed += 1
except TimeoutError:
print(f"Timed out after processing {processed} results.")
3) Streaming a second mapping based on each completed future
This is how you start processing each returned list as soon as it’s ready:
from prefect import flow, task
from prefect.futures import as_completed
@task
def search_api(q) -> list[dict]:
...
@task
def process_record(rec: dict):
...
@flow
def pipeline(queries: list[str]):
search_futures = search_api.map(queries)
processed_futures = []
for fut in as_completed(search_futures):
records = fut.result() # the list from that specific search
processed_futures.extend(process_record.map(records))
# Optionally wait for all processing to finish
return [pf.result() for pf in processed_futures]
When to choose as_completed vs wait()
- Use as_completed when you want a streaming pipeline (react to work finishing).
- Use wait(futures) when you want a snapshot: split into done/not_done at a moment in time, then decide what to do next.
Docs and references
- API reference (futures): prefect.futures
- Concurrency patterns: Run work concurrently
If you share:
- about how many futures (tasks) you expect,
- whether you want to stop on first error or keep going,
- and if you need a global timeout for the whole stage,
I can tailor an example that matches your exact use case.Tanya Hristova
10/31/2025, 12:41 PMMarvin
10/31/2025, 12:41 PMMarvin
10/31/2025, 12:42 PMfrom prefect import flow, task
from prefect.futures import as_completed
from prefect.task_runners import ThreadPoolTaskRunner
@task
def search_api(query: str) -> list[dict]:
# call external API
...
@task
def process_record(rec: dict) -> dict:
# post-process one record
...
@flow(task_runner=ThreadPoolTaskRunner(max_workers=20))
def pipeline(queries: list[str]):
# Fan-out: run all searches concurrently
search_futs = search_api.map(queries)
processed_futs = []
# Streaming fan-in: as each search completes, immediately process its records
for fut in as_completed(search_futs):
records = fut.result() # this search’s list
processed_futs.extend(process_record.map(records))
# Optionally wait for all processing to finish (true fan-in)
return [pf.result() for pf in processed_futs]
Notes
- as_completed yields futures in completion order, not submission order.
- You can combine this with rate limits and retries on the API task.
- If you want to persist or aggregate results incrementally (to a DB, S3, etc.), call that in the loop so slow searches don’t block fast ones.
Good references
- Concurrent execution patterns: Run work concurrently
- Futures API: prefect.futures
If you share your expected number of searches/records and any API rate limits, I can sketch a tuned version with proper concurrency caps and backoff.Tanya Hristova
11/04/2025, 8:53 AMMarvin
11/04/2025, 8:53 AMMarvin
11/04/2025, 9:01 AMfrom prefect import flow, task, unmapped, tags
from prefect.tasks import exponential_backoff
@task
def process_record(rec: dict) -> dict:
# pure transform, no DB IO
# ... do CPU/light IO work ...
return {"id": rec["id"], "value": rec["value"] * 2}
@task(
retries=3,
retry_delay_seconds=exponential_backoff(2),
)
def write_batch(processed: list[dict]):
# Use your DB client (SQLAlchemy example shown)
from sqlalchemy import create_engine, text
engine = create_engine("<postgresql+psycopg2://user:pass@host:5432/db>")
# single transaction for the whole batch write
with engine.begin() as conn:
# Example bulk upsert — adapt to your dialect
stmt = text("""
INSERT INTO my_table (id, value)
VALUES (:id, :value)
ON CONFLICT (id) DO UPDATE SET value = EXCLUDED.value
""")
conn.execute(stmt, processed)
@flow
def process_and_write(records: list[dict]):
processed_futures = process_record.map(records)
# Optional: limit concurrency on the write using a tag
with tags("db_write"):
write_batch(processed_futures)
# process_and_write(records) # call with your batch
Example 2 — Chunk first to control memory, then single batch write
from prefect import flow, task, tags
from prefect.utilities.collections import batched_iterable
from prefect.tasks import exponential_backoff
@task
def process_chunk(chunk: list[dict]) -> list[dict]:
return [{"id": r["id"], "value": r["value"] * 2} for r in chunk]
@task(
retries=3,
retry_delay_seconds=exponential_backoff(2),
)
def write_batch(all_processed: list[list[dict]]):
# flatten
flat = [item for sub in all_processed for item in sub]
# write once, as above...
# with engine.begin() as conn: conn.execute(..., flat)
@flow
def process_and_write_chunked(records: list[dict], batch_size: int = 5_000):
chunks = [list(c) for c in batched_iterable(records, batch_size)]
processed_per_chunk = process_chunk.map(chunks)
with tags("db_write"):
write_batch(processed_per_chunk)
Operational tips
- Concurrency limits: If multiple flow runs may hit your DB, use tag-based concurrency limits on the “db_write” tag to cap concurrent batch writes. See Tag-based concurrency limits and Global concurrency limits.
- Retries and backoff: Configure retries on the batch write task for transient DB errors. Docs: Retries.Marvin
11/04/2025, 9:01 AMTanya Hristova
11/04/2025, 9:29 AMMarvin
11/04/2025, 9:29 AMMarvin
11/04/2025, 9:31 AMMarvin
11/04/2025, 9:31 AMTanya Hristova
11/05/2025, 11:04 AMMarvin
11/05/2025, 11:04 AMMarvin
11/05/2025, 11:06 AMrequests.Session per process/thread with a bigger pool and pool_block=True (so callers wait instead of oversubscribing).
- Make sure you fully consume/close responses so connections return to the pool.
import threading
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
_thread_local = threading.local()
def get_session():
if not hasattr(_thread_local, "session"):
s = requests.Session()
retry = Retry(
total=3,
backoff_factor=0.5,
status_forcelist=[429, 500, 502, 503, 504],
allowed_methods=["GET", "POST", "PUT", "DELETE", "PATCH"],
)
adapter = HTTPAdapter(
pool_connections=100, # total pools
pool_maxsize=100, # per-host concurrent keep-alive connections
max_retries=retry,
pool_block=True, # block instead of creating/discarding extras
)
s.mount("http://", adapter)
s.mount("https://", adapter)
_thread_local.session = s
return _thread_local.session
def http_get(url, **kwargs):
session = get_session()
# Ensure the response is closed or fully read so the connection is returned to the pool
with session.get(url, timeout=30, **kwargs) as r:
r.raise_for_status()
return r.json()
Notes:
- Don’t create a new Session per request; reuse the configured one.
- If you use stream=True, read to EOF and close, or the connection won’t be returned to the pool.
2) Reduce concurrency from Prefect
- If lots of tasks call the same host concurrently, cap them with a tag-based concurrency limit so you don’t exceed your pool.
- CLI (verified):
prefect concurrency-limit create http_outbound 20
- Then tag the tasks that make HTTP calls:
from prefect import task, tags
@task
def fetch(url):
return http_get(url)
# when calling the task
with tags("http_outbound"):
fetch.map(urls)
- Docs: Tag-based concurrency limits and Global concurrency limits.
You can also limit local parallelism:
from prefect import flow
from prefect.task_runners import ThreadPoolTaskRunner
@flow(task_runner=ThreadPoolTaskRunner(max_workers=20))
def my_flow(...):
...
3) Match pool sizing to reality
- If your upstream or networking stack limits connections (e.g., 50 per client), set pool_maxsize and Prefect concurrency to stay within that.
- If you’re running in multiple pods/workers, remember the limit applies per process; multiply accordingly.
If you’re using an async client instead of requests:
- httpx: configure limits with httpx.Limits(max_keepalive_connections=..., max_connections=...) and reuse a single Client.
- aiohttp: reuse a ClientSession with a TCPConnector(limit=..., limit_per_host=...).
If you share a bit about how you’re calling that 172.19.8.47 endpoint (requests vs httpx/aiohttp, per-task or shared session), I can tailor the exact code pattern.Tanya Hristova
11/05/2025, 11:26 AMMarvin
11/05/2025, 11:26 AMMarvin
11/05/2025, 11:27 AMMarvin
11/05/2025, 11:27 AM