<@ULVA73B9P> I have a Prefect server running local...
# ask-marvin
a
@Marvin I have a Prefect server running locally in Docker. I have a single prefect worker running outside of Docker. I have a Python flow that needs to execute 100,000 tasks, and they must run locally on a MacBook M1 with 32 GB of RAM. The tasks are I/O bound, as they have to download a JSON payload from the Internet and save it to a file. What is the best task runner for this scenario? What concurrency limits should I apply?
m
thought for 367.3 seconds
Great question — and nice setup. Assuming Prefect 3.x, here’s what I’d recommend for 100k I/O‑bound tasks running locally on a MacBook: Summary - Task runner: ThreadPoolTaskRunner (threads are ideal for I/O-bound work) - Start with 64–128 concurrent tasks and tune - Add a global concurrency limit (and optionally a rate limit) to protect your machine and the remote service - Check/raise macOS open file limits to avoid “too many open files” errors Why ThreadPoolTaskRunner - Threads excel for I/O-bound tasks (like HTTP downloads) and avoid Python’s GIL being a bottleneck - Easy to cap concurrency with
max_workers
- Docs: Task runners (Prefect 3) Example flow + task with concurrency guard
Copy code
from pathlib import Path
import requests

from prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner
from prefect.concurrency.sync import concurrency  # or rate_limit if you need RPS limiting

@task(persist_result=False)  # you're writing to files, so no need to persist task results
def fetch_and_save(url: str, out_dir: str):
    # Ensure only N of these run across your whole deployment
    with concurrency("download", occupy=1):
        r = requests.get(url, timeout=30)
        r.raise_for_status()
        out_dir_p = Path(out_dir)
        out_dir_p.mkdir(parents=True, exist_ok=True)
        (out_dir_p / f"{hash(url)}.json").write_bytes(r.content)

@flow(name="bulk-download", task_runner=ThreadPoolTaskRunner(max_workers=64))
def bulk_download(urls: list[str], out_dir: str):
    # Submit tasks; Prefect will run up to max_workers concurrently
    for url in urls:
        fetch_and_save.submit(url, out_dir)

# bulk_download(urls, "/tmp/json")
Set a global concurrency limit - This caps concurrency across all flow runs that use
concurrency("download")
- Create it once (adjust 64 to suit):
Copy code
prefect global-concurrency-limit create download --limit 64
- CLI help: Global concurrency limit CLI - Guide: How-to: Global concurrency limits Optional: rate limit (if the API has RPS rules) - Create a rate-limited GCL with a burst and decay (tokens/sec). For example, burst 100 with 50 req/s:
Copy code
prefect global-concurrency-limit create download --limit 100 --slot-decay-per-second 50
- In your task, use:
Copy code
from prefect.concurrency.sync import rate_limit

with rate_limit("download"):
    ...
- Docs: Rate limits (concepts) Choosing concurrency numbers - Start with
max_workers=64
and benchmark. On an M1 with 32 GB RAM, 64–128 is usually safe for I/O-bound network+file operations - Watch CPU, memory, and network saturation; adjust up/down - Keep the GCL
--limit
equal to your
max_workers
(or slightly lower) to enforce a hard cap across runs macOS open file limits - Each concurrent task will consume at least a socket and a file handle - Check the limit from Python (Prefect utility):
Copy code
from prefect.utilities.filesystem import get_open_file_limit
print(get_open_file_limit())
- Or in your shell:
Copy code
ulimit -n
- If you raise concurrency above ~128, you may need to increase this limit (e.g.,
ulimit -n 10240
for the current shell/session). macOS has system-wide limits via
launchctl
if you need it long-term - API reference: prefect.utilities.filesystem Notes - Work pool concurrency limits affect concurrent flow runs, not per-task concurrency within a single flow. With one worker and one flow, the task runner and GCL are the main controls - With 100k tasks, consider turning off result persistence (as shown) to reduce orchestration overhead If you share what URLs/service you’re hitting (and any RPS limits), I can help you pick an initial
max_workers
and rate limit that’s safe and fast.
a
@Marvin, I am calling the TMDB API, which has a rate limit of 50 requests per second.
m
thought for 55.8 seconds
Perfect — with TMDB at 50 requests/second, here’s a concrete setup that will be fast and safe. What to use - Task runner:
ThreadPoolTaskRunner
(best for I/O-bound work) - Concurrency:
max_workers
64–128 - Rate limit: Prefect Global Concurrency Limit (GCL) at 50 RPS, used via
rate_limit("tmdb")
Create the global rate limit - This enforces 50 requests per second across all runs that use the same limit name:
Copy code
prefect global-concurrency-limit create tmdb --limit 50 --slot-decay-per-second 50
- Docs: How-to: Global concurrency limits, CLI reference, Rate limits concept Flow/task example
Copy code
from pathlib import Path
import requests

from prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner
from prefect.concurrency.sync import rate_limit

@task(persist_result=False, retries=3, retry_delay_seconds=5)
def fetch_and_save(url: str, out_dir: str):
    # Enforce TMDB rate limit globally
    with rate_limit("tmdb"):
        r = requests.get(url, timeout=30)
        r.raise_for_status()
        out = Path(out_dir)
        out.mkdir(parents=True, exist_ok=True)
        (out / f"{hash(url)}.json").write_bytes(r.content)

@flow(name="tmdb-downloads", task_runner=ThreadPoolTaskRunner(max_workers=128))
def tmdb_downloads(urls: list[str], out_dir: str):
    for url in urls:
        fetch_and_save.submit(url, out_dir)

# tmdb_downloads(urls, "/tmp/tmdb-json")
Choosing the numbers - `max_workers`: 64–128 is a good starting range on an M1/32GB for I/O-heavy work. More threads won’t exceed 50 RPS (the GCL enforces it) but can help keep the pipeline full under variable latency. - GCL:
--limit 50 --slot-decay-per-second 50
caps to ~50 requests per second (with a small natural burst up to 50). If TMDB allows small bursts, this is fine; otherwise, keep as-is. macOS open file limits - 64–128 concurrent requests + file writes can hit the default macOS file limit. - Check:
Copy code
ulimit -n
- If needed, temporarily raise in your shell before starting the worker (e.g.,
ulimit -n 4096
). For persistent changes, use
launchctl
configuration. Optional tips - If you see 429s, keep the
retries
above or add backoff logic on 429. - Consider batching or chunking URL submission to keep memory steady when dispatching 100k tasks. - You can also set a work pool concurrency limit if you want to limit concurrent flow runs, but it won’t change per-task throughput inside a single flow. If you can share average response latency you’re seeing from TMDB, I can suggest a more precise
max_workers
to keep your worker saturated while staying within 50 RPS.
a
@Marvin, the average response latency is around 2.3 seconds (averaged over 200 requests)
m
thought for 37.6 seconds
Great — with ~2.3s average latency and a 50 RPS cap, you’ll want enough parallelism to keep ~115 requests in flight. Rule of thumb - Required concurrency ≈ RPS × latency ≈ 50 × 2.3 ≈ 115 - Recommend ThreadPoolTaskRunner with
max_workers
≈ 128 - Enforce 50 RPS using a global rate limit Create the global rate limit
Copy code
prefect global-concurrency-limit create tmdb --limit 50 --slot-decay-per-second 50
- How-to: Global concurrency limits - CLI: Global concurrency limit CLI - Rate limits: Rate limits concept Flow example (ThreadPoolTaskRunner + rate limit + pooled HTTP)
Copy code
from pathlib import Path
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

from prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner
from prefect.concurrency.sync import rate_limit

# Build a shared session with connection pooling and retries
session = requests.Session()
adapter = HTTPAdapter(
    pool_connections=200,  # total connections to keep in pool
    pool_maxsize=200,      # max connections per host
    max_retries=Retry(
        total=5,
        backoff_factor=1.5,
        status_forcelist=[429, 500, 502, 503, 504],
        respect_retry_after_header=True,
        raise_on_status=False,
    ),
)
session.mount("http://", adapter)
session.mount("https://", adapter)

@task(persist_result=False, retries=3, retry_delay_seconds=5)
def fetch_and_save(url: str, out_dir: str):
    # Enforce 50 RPS globally; strict=True helps smooth bursts
    with rate_limit("tmdb", strict=True):
        r = session.get(url, timeout=30)
        r.raise_for_status()
        out = Path(out_dir)
        out.mkdir(parents=True, exist_ok=True)
        (out / f"{hash(url)}.json").write_bytes(r.content)

@flow(name="tmdb-downloads", task_runner=ThreadPoolTaskRunner(max_workers=128))
def tmdb_downloads(urls: list[str], out_dir: str):
    for url in urls:
        fetch_and_save.submit(url, out_dir)

# tmdb_downloads(urls, "/tmp/tmdb-json")
Why these numbers -
max_workers=128
keeps ~115 requests in flight to saturate 50 RPS at 2.3s latency - The GCL +
rate_limit("tmdb")
ensures you never exceed 50 RPS even if threads > 50 - The HTTPAdapter pool avoids bottlenecking on the default small pool size macOS file descriptor limits - With ~128 concurrent sockets + file writes, check and raise your open file limit if needed:
Copy code
ulimit -n
# optionally raise for current shell
ulimit -n 4096
- Reference utility:
prefect.utilities.filesystem.get_open_file_limit
Notes - If you run multiple flows/workers against the same TMDB key, they will all share the same rate limit via the “tmdb” name - For 100k tasks, consider chunked submission to keep memory usage steady (e.g., iterate in batches of a few thousand URLs) This setup should deliver near-maximum throughput for TMDB without violating 50 RPS.
a
@Marvin, how would I implement chunked submission?
m
thought for 41.5 seconds
Two easy patterns work well in Prefect 3.x to avoid holding 100k futures in memory while still keeping throughput high. Option A: Fixed-size batching - Submit a batch, wait for it to finish, then submit the next. Simple and memory-friendly.
Copy code
from itertools import islice
from prefect import flow
from prefect.futures import wait

def batched(iterable, n):
    it = iter(iterable)
    while True:
        batch = list(islice(it, n))
        if not batch:
            return
        yield batch

@flow
def tmdb_downloads(urls: list[str], out_dir: str, batch_size: int = 2000):
    for batch in batched(urls, batch_size):
        futures = [fetch_and_save.submit(u, out_dir) for u in batch]
        wait(futures)  # waits until this batch finishes before moving on
- Pick batch_size ~ 1000–5000. With your setup (rate limit 50 RPS, ~2.3s latency, max_workers ~128), 2000 is a good starting point. Option B: Sliding window (steady max_inflight) - Keep a bounded number of tasks “in flight”; submit a new one each time one completes.
Copy code
from prefect import flow
from prefect.futures import wait

@flow
def tmdb_downloads_streaming(urls: list[str], out_dir: str, max_inflight: int = 1000):
    urls_iter = iter(urls)
    inflight = []

    # Prime the window
    while len(inflight) < max_inflight:
        try:
            url = next(urls_iter)
        except StopIteration:
            break
        inflight.append(fetch_and_save.submit(url, out_dir))

    # Keep the window full until we exhaust URLs
    while inflight:
        result = wait(inflight, timeout=60)  # returns some done + not_done
        for fut in result.done:
            try:
                fut.result()  # surface any errors
            except Exception as e:
                # TODO: log/handle as needed
                pass
            inflight.remove(fut)

            # Refill
            try:
                url = next(urls_iter)
                inflight.append(fetch_and_save.submit(url, out_dir))
            except StopIteration:
                # Nothing left to add; continue draining
                pass
- Start with max_inflight ~ 1000. It doesn’t control RPS (your
rate_limit("tmdb")
does) but limits memory and the number of futures tracked at once. Notes - Keep
ThreadPoolTaskRunner(max_workers=128)
and
rate_limit("tmdb")
with a GCL created via:
Copy code
prefect global-concurrency-limit create tmdb --limit 50 --slot-decay-per-second 50
- Batch size/max_inflight sizing: - If you see memory pressure, lower the numbers. - If you want faster startup and more smoothing against latency spikes, increase them moderately. - With rate limiting at the top of the task, threads that can’t get a token will wait before making the HTTP call, so actual concurrent sockets stay near the steady-state (~115). Your open-file usage stays reasonable, but you may still want
ulimit -n 4096
for safety.