Tom Han
09/02/2025, 5:37 PMMarvin
09/02/2025, 5:37 PMMarvin
09/02/2025, 5:41 PMfrom prefect import flow, task, tags
from prefect.futures import as_completed
from prefect.task_runners import ThreadPoolTaskRunner
@task(
name="process-video",
retries=3,
retry_delay_seconds=30,
persist_result=False, # avoid storing large results in the DB
task_run_name=lambda vid: f"process-video-{vid}"
)
def process_video(video_id: str):
# Your ingest logic here
return {"video_id": video_id, "status": "ok"}
@flow(name="bulk-video-ingest", task_runner=ThreadPoolTaskRunner(max_workers=32))
def bulk_video_ingest(video_ids: list[str], max_inflight: int = 500):
in_flight = set()
completed = 0
# Tag tasks so you can enforce a global concurrency limit server-side
with tags("video-ingest"):
for vid in video_ids:
fut = process_video.submit(vid)
in_flight.add(fut)
# Backpressure: wait for at least one to finish when the window is full
if len(in_flight) >= max_inflight:
for done in as_completed(list(in_flight)):
in_flight.remove(done)
completed += 1
break
# Drain remaining tasks
for done in as_completed(list(in_flight)):
in_flight.remove(done)
completed += 1
return completed
Why this works
- You still get per-item observability: each video is a task run with its own state, logs, and name.
- The “window” avoids overwhelming the scheduler/DB with 20k pending tasks at once.
- Server-side limits ensure your workers and external systems aren’t overloaded.
Controls to turn on
- Global concurrency limit by tag (throttles server-side across all flows)
- Create once and reuse with your video-ingest
tag:
prefect concurrency-limit create video-ingest --limit 200
- Docs: Global concurrency limits
- Work pool concurrency limit (cap total concurrent runs on a pool)
prefect work-pool set-concurrency-limit <your-pool> 200
- Task-level rate limiting (if hitting an API or shared resource)
- Concept: Rate limits
- Futures utilities
- as_completed
and wait
reference: Futures API
Additional tips
- Prefer tasks-per-item over subflows-per-item for huge fan-out. Subflows create separate flow runs (more DB records) and more overhead. Use subflows only when you need stronger isolation or a different deployment/runtime per item.
- Keep results small: persist_result=False
on the task avoids storing large payloads in the result store.
- Name runs clearly: task_run_name
with the video ID makes it easy to find specific items in the UI.
- Logging: keep logs concise; very verbose per-item logging can add load to your DB. Consider INFO-level summaries plus WARN/ERROR for issues.
- If “20k at once” is common and very heavy, consider a queue-backed pattern (e.g., enqueue IDs to SQS/Kafka; run a steady number of short flows/tasks pulling from the queue). This decouples submission from execution and keeps your orchestration graph small, at the cost of less “single run” aggregation.Marvin
09/02/2025, 5:41 PMmax_workers
, pool limits, and whether ProcessPool vs ThreadPool is better.
If you share those details, I can suggest exact limit values and the best task runner configuration for your setup.