<@ULVA73B9P> I have a workflow where I might be in...
# ask-marvin
t
@Marvin I have a workflow where I might be ingesting 20 or 20000 videos at once, if I use a task for every single videos I fear I might clog the Prefect database, what is the best way to handle this type of data ingestion in perfect? I don't want to introduce unnecessary batching if possible while maintaining observalibility.
m
thought for 249.7 seconds
Great question — high fan-out can be powerful but it’s smart to think about DB load and observability up front. In Prefect 3.x, you can safely process large sets without “clogging” the database by combining controlled concurrency with lightweight per-item tasks. Recommended pattern: windowed Task.submit with limits - Submit tasks for each video but only keep a bounded number “in flight” at a time — this gives you parallelism without dumping 20k tasks into the system at once. - Use tags + concurrency limits to apply backpressure from the server side too. - Avoid persisting large results and keep logs reasonable for DB health. - Use a parallel task runner on the flow for true concurrency. Example
Copy code
from prefect import flow, task, tags
from prefect.futures import as_completed
from prefect.task_runners import ThreadPoolTaskRunner

@task(
    name="process-video",
    retries=3,
    retry_delay_seconds=30,
    persist_result=False,  # avoid storing large results in the DB
    task_run_name=lambda vid: f"process-video-{vid}"
)
def process_video(video_id: str):
    # Your ingest logic here
    return {"video_id": video_id, "status": "ok"}

@flow(name="bulk-video-ingest", task_runner=ThreadPoolTaskRunner(max_workers=32))
def bulk_video_ingest(video_ids: list[str], max_inflight: int = 500):
    in_flight = set()
    completed = 0

    # Tag tasks so you can enforce a global concurrency limit server-side
    with tags("video-ingest"):
        for vid in video_ids:
            fut = process_video.submit(vid)
            in_flight.add(fut)

            # Backpressure: wait for at least one to finish when the window is full
            if len(in_flight) >= max_inflight:
                for done in as_completed(list(in_flight)):
                    in_flight.remove(done)
                    completed += 1
                    break

        # Drain remaining tasks
        for done in as_completed(list(in_flight)):
            in_flight.remove(done)
            completed += 1

    return completed
Why this works - You still get per-item observability: each video is a task run with its own state, logs, and name. - The “window” avoids overwhelming the scheduler/DB with 20k pending tasks at once. - Server-side limits ensure your workers and external systems aren’t overloaded. Controls to turn on - Global concurrency limit by tag (throttles server-side across all flows) - Create once and reuse with your
video-ingest
tag:
Copy code
prefect concurrency-limit create video-ingest --limit 200
- Docs: Global concurrency limits - Work pool concurrency limit (cap total concurrent runs on a pool)
Copy code
prefect work-pool set-concurrency-limit <your-pool> 200
- Task-level rate limiting (if hitting an API or shared resource) - Concept: Rate limits - Futures utilities -
as_completed
and
wait
reference: Futures API Additional tips - Prefer tasks-per-item over subflows-per-item for huge fan-out. Subflows create separate flow runs (more DB records) and more overhead. Use subflows only when you need stronger isolation or a different deployment/runtime per item. - Keep results small:
persist_result=False
on the task avoids storing large payloads in the result store. - Name runs clearly:
task_run_name
with the video ID makes it easy to find specific items in the UI. - Logging: keep logs concise; very verbose per-item logging can add load to your DB. Consider INFO-level summaries plus WARN/ERROR for issues. - If “20k at once” is common and very heavy, consider a queue-backed pattern (e.g., enqueue IDs to SQS/Kafka; run a steady number of short flows/tasks pulling from the queue). This decouples submission from execution and keeps your orchestration graph small, at the cost of less “single run” aggregation.
Open questions to tailor the guidance - Are you on Prefect Cloud or self-hosted Server? (This affects recommended DB and concurrency settings.) - Do you need per-video run detail in the UI, or is a high-level success/failure summary enough? - How heavy is each video job (CPU/GPU/network) and where do you run workers (K8s, ECS, VMs)? This informs
max_workers
, pool limits, and whether ProcessPool vs ThreadPool is better. If you share those details, I can suggest exact limit values and the best task runner configuration for your setup.