Angelo Tata
10/05/2025, 10:41 AMMarvin
10/05/2025, 10:41 AMMarvin
10/05/2025, 10:47 AMmax_workers
- Docs: Task runners (Prefect 3)
Example flow + task with concurrency guard
from pathlib import Path
import requests
from prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner
from prefect.concurrency.sync import concurrency # or rate_limit if you need RPS limiting
@task(persist_result=False) # you're writing to files, so no need to persist task results
def fetch_and_save(url: str, out_dir: str):
# Ensure only N of these run across your whole deployment
with concurrency("download", occupy=1):
r = requests.get(url, timeout=30)
r.raise_for_status()
out_dir_p = Path(out_dir)
out_dir_p.mkdir(parents=True, exist_ok=True)
(out_dir_p / f"{hash(url)}.json").write_bytes(r.content)
@flow(name="bulk-download", task_runner=ThreadPoolTaskRunner(max_workers=64))
def bulk_download(urls: list[str], out_dir: str):
# Submit tasks; Prefect will run up to max_workers concurrently
for url in urls:
fetch_and_save.submit(url, out_dir)
# bulk_download(urls, "/tmp/json")
Set a global concurrency limit
- This caps concurrency across all flow runs that use concurrency("download")
- Create it once (adjust 64 to suit):
prefect global-concurrency-limit create download --limit 64
- CLI help: Global concurrency limit CLI
- Guide: How-to: Global concurrency limits
Optional: rate limit (if the API has RPS rules)
- Create a rate-limited GCL with a burst and decay (tokens/sec). For example, burst 100 with 50 req/s:
prefect global-concurrency-limit create download --limit 100 --slot-decay-per-second 50
- In your task, use:
from prefect.concurrency.sync import rate_limit
with rate_limit("download"):
...
- Docs: Rate limits (concepts)
Choosing concurrency numbers
- Start with max_workers=64
and benchmark. On an M1 with 32 GB RAM, 64–128 is usually safe for I/O-bound network+file operations
- Watch CPU, memory, and network saturation; adjust up/down
- Keep the GCL --limit
equal to your max_workers
(or slightly lower) to enforce a hard cap across runs
macOS open file limits
- Each concurrent task will consume at least a socket and a file handle
- Check the limit from Python (Prefect utility):
from prefect.utilities.filesystem import get_open_file_limit
print(get_open_file_limit())
- Or in your shell:
ulimit -n
- If you raise concurrency above ~128, you may need to increase this limit (e.g., ulimit -n 10240
for the current shell/session). macOS has system-wide limits via launchctl
if you need it long-term
- API reference: prefect.utilities.filesystem
Notes
- Work pool concurrency limits affect concurrent flow runs, not per-task concurrency within a single flow. With one worker and one flow, the task runner and GCL are the main controls
- With 100k tasks, consider turning off result persistence (as shown) to reduce orchestration overhead
If you share what URLs/service you’re hitting (and any RPS limits), I can help you pick an initial max_workers
and rate limit that’s safe and fast.Angelo Tata
10/05/2025, 11:02 AMMarvin
10/05/2025, 3:20 PMMarvin
10/05/2025, 3:21 PMThreadPoolTaskRunner
(best for I/O-bound work)
- Concurrency: max_workers
64–128
- Rate limit: Prefect Global Concurrency Limit (GCL) at 50 RPS, used via rate_limit("tmdb")
Create the global rate limit
- This enforces 50 requests per second across all runs that use the same limit name:
prefect global-concurrency-limit create tmdb --limit 50 --slot-decay-per-second 50
- Docs: How-to: Global concurrency limits, CLI reference, Rate limits concept
Flow/task example
from pathlib import Path
import requests
from prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner
from prefect.concurrency.sync import rate_limit
@task(persist_result=False, retries=3, retry_delay_seconds=5)
def fetch_and_save(url: str, out_dir: str):
# Enforce TMDB rate limit globally
with rate_limit("tmdb"):
r = requests.get(url, timeout=30)
r.raise_for_status()
out = Path(out_dir)
out.mkdir(parents=True, exist_ok=True)
(out / f"{hash(url)}.json").write_bytes(r.content)
@flow(name="tmdb-downloads", task_runner=ThreadPoolTaskRunner(max_workers=128))
def tmdb_downloads(urls: list[str], out_dir: str):
for url in urls:
fetch_and_save.submit(url, out_dir)
# tmdb_downloads(urls, "/tmp/tmdb-json")
Choosing the numbers
- `max_workers`: 64–128 is a good starting range on an M1/32GB for I/O-heavy work. More threads won’t exceed 50 RPS (the GCL enforces it) but can help keep the pipeline full under variable latency.
- GCL: --limit 50 --slot-decay-per-second 50
caps to ~50 requests per second (with a small natural burst up to 50). If TMDB allows small bursts, this is fine; otherwise, keep as-is.
macOS open file limits
- 64–128 concurrent requests + file writes can hit the default macOS file limit.
- Check:
ulimit -n
- If needed, temporarily raise in your shell before starting the worker (e.g., ulimit -n 4096
). For persistent changes, use launchctl
configuration.
Optional tips
- If you see 429s, keep the retries
above or add backoff logic on 429.
- Consider batching or chunking URL submission to keep memory steady when dispatching 100k tasks.
- You can also set a work pool concurrency limit if you want to limit concurrent flow runs, but it won’t change per-task throughput inside a single flow.
If you can share average response latency you’re seeing from TMDB, I can suggest a more precise max_workers
to keep your worker saturated while staying within 50 RPS.Angelo Tata
10/05/2025, 4:10 PMMarvin
10/05/2025, 4:10 PMMarvin
10/05/2025, 4:10 PMmax_workers
≈ 128
- Enforce 50 RPS using a global rate limit
Create the global rate limit
prefect global-concurrency-limit create tmdb --limit 50 --slot-decay-per-second 50
- How-to: Global concurrency limits
- CLI: Global concurrency limit CLI
- Rate limits: Rate limits concept
Flow example (ThreadPoolTaskRunner + rate limit + pooled HTTP)
from pathlib import Path
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner
from prefect.concurrency.sync import rate_limit
# Build a shared session with connection pooling and retries
session = requests.Session()
adapter = HTTPAdapter(
pool_connections=200, # total connections to keep in pool
pool_maxsize=200, # max connections per host
max_retries=Retry(
total=5,
backoff_factor=1.5,
status_forcelist=[429, 500, 502, 503, 504],
respect_retry_after_header=True,
raise_on_status=False,
),
)
session.mount("http://", adapter)
session.mount("https://", adapter)
@task(persist_result=False, retries=3, retry_delay_seconds=5)
def fetch_and_save(url: str, out_dir: str):
# Enforce 50 RPS globally; strict=True helps smooth bursts
with rate_limit("tmdb", strict=True):
r = session.get(url, timeout=30)
r.raise_for_status()
out = Path(out_dir)
out.mkdir(parents=True, exist_ok=True)
(out / f"{hash(url)}.json").write_bytes(r.content)
@flow(name="tmdb-downloads", task_runner=ThreadPoolTaskRunner(max_workers=128))
def tmdb_downloads(urls: list[str], out_dir: str):
for url in urls:
fetch_and_save.submit(url, out_dir)
# tmdb_downloads(urls, "/tmp/tmdb-json")
Why these numbers
- max_workers=128
keeps ~115 requests in flight to saturate 50 RPS at 2.3s latency
- The GCL + rate_limit("tmdb")
ensures you never exceed 50 RPS even if threads > 50
- The HTTPAdapter pool avoids bottlenecking on the default small pool size
macOS file descriptor limits
- With ~128 concurrent sockets + file writes, check and raise your open file limit if needed:
ulimit -n
# optionally raise for current shell
ulimit -n 4096
- Reference utility: prefect.utilities.filesystem.get_open_file_limit
Notes
- If you run multiple flows/workers against the same TMDB key, they will all share the same rate limit via the “tmdb” name
- For 100k tasks, consider chunked submission to keep memory usage steady (e.g., iterate in batches of a few thousand URLs)
This setup should deliver near-maximum throughput for TMDB without violating 50 RPS.Angelo Tata
10/05/2025, 5:56 PMMarvin
10/05/2025, 5:56 PMMarvin
10/05/2025, 5:56 PMfrom itertools import islice
from prefect import flow
from prefect.futures import wait
def batched(iterable, n):
it = iter(iterable)
while True:
batch = list(islice(it, n))
if not batch:
return
yield batch
@flow
def tmdb_downloads(urls: list[str], out_dir: str, batch_size: int = 2000):
for batch in batched(urls, batch_size):
futures = [fetch_and_save.submit(u, out_dir) for u in batch]
wait(futures) # waits until this batch finishes before moving on
- Pick batch_size ~ 1000–5000. With your setup (rate limit 50 RPS, ~2.3s latency, max_workers ~128), 2000 is a good starting point.
Option B: Sliding window (steady max_inflight)
- Keep a bounded number of tasks “in flight”; submit a new one each time one completes.
from prefect import flow
from prefect.futures import wait
@flow
def tmdb_downloads_streaming(urls: list[str], out_dir: str, max_inflight: int = 1000):
urls_iter = iter(urls)
inflight = []
# Prime the window
while len(inflight) < max_inflight:
try:
url = next(urls_iter)
except StopIteration:
break
inflight.append(fetch_and_save.submit(url, out_dir))
# Keep the window full until we exhaust URLs
while inflight:
result = wait(inflight, timeout=60) # returns some done + not_done
for fut in result.done:
try:
fut.result() # surface any errors
except Exception as e:
# TODO: log/handle as needed
pass
inflight.remove(fut)
# Refill
try:
url = next(urls_iter)
inflight.append(fetch_and_save.submit(url, out_dir))
except StopIteration:
# Nothing left to add; continue draining
pass
- Start with max_inflight ~ 1000. It doesn’t control RPS (your rate_limit("tmdb")
does) but limits memory and the number of futures tracked at once.
Notes
- Keep ThreadPoolTaskRunner(max_workers=128)
and rate_limit("tmdb")
with a GCL created via:
prefect global-concurrency-limit create tmdb --limit 50 --slot-decay-per-second 50
- Batch size/max_inflight sizing:
- If you see memory pressure, lower the numbers.
- If you want faster startup and more smoothing against latency spikes, increase them moderately.
- With rate limiting at the top of the task, threads that can’t get a token will wait before making the HTTP call, so actual concurrent sockets stay near the steady-state (~115). Your open-file usage stays reasonable, but you may still want ulimit -n 4096
for safety.