<@ULVA73B9P> how should we think about task groups...
# ask-marvin
k
@Marvin how should we think about task groups in prefect? provide recommended prefect flow/task setup overview concisely using tool retrieving prefect docs and guides. e.g. we have 1000 images. we need to do 4 things for each image. Retrieve, Process async, Poll for processing completion, Send somewhere. Concurrency limits apply.
m
thought for 382.4 seconds
Short answer: Prefect 3.x doesn’t have “task groups” as a first-class object. Instead, group related steps by: - Encapsulating them in a small Python helper that wires tasks together, or - Using a subflow when you want separate run-level visibility/ownership. For your case (1000 images, steps: Retrieve → Kick off async processing → Poll for completion → Send), a clean pattern is: - Model each step as a task. - Encapsulate the per-image pipeline in a helper function that chains those tasks with .submit and returns the final future. - Fan out over images with a list comprehension (or mapping if you prefer), while enforcing concurrency via: - Task runner limits (max_workers) - Server-side global concurrency limits by tag (for shared resources like an external API) - Optional rate limits if you must throttle requests Example (Prefect 3.x)
Copy code
from prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner
import time

# IO-bound: add a tag used for server-side concurrency limiting (see docs link below)
@task(tags={"external-api"})
def retrieve(image_id: str) -> bytes:
    # fetch from storage
    ...

@task(tags={"external-api"})
def start_processing(image_bytes: bytes) -> str:
    # submit job to external service, return job_id
    ...

# Retries help tolerate transient API errors; the task itself polls with sleeps/backoff.
@task(retries=5, retry_delay_seconds=10, tags={"external-api"})
def poll_until_done(job_id: str) -> dict:
    deadline = time.time() + 60 * 15  # e.g., 15m timeout
    while time.time() < deadline:
        status = ...  # call external service
        if status["done"]:
            return status["result"]
        time.sleep(3)  # simple backoff; tune to your SLA
    raise TimeoutError(f"Job {job_id} did not complete in time")

@task(tags={"delivery"})
def send_somewhere(result: dict) -> None:
    # publish/store/etc.
    ...

# Helper that “groups” the per-image steps by wiring futures
def process_one_image(image_id: str):
    img = retrieve.submit(image_id)
    job = start_processing.submit(img)
    result = poll_until_done.submit(job)
    return send_somewhere.submit(result)

@flow(task_runner=ThreadPoolTaskRunner(max_workers=50))
def batch_process_images(image_ids: list[str]) -> None:
    futures = [process_one_image(i) for i in image_ids]
    # Block until done (optional; remove if you want the flow to finish early)
    for f in futures:
        f.result()

# Example invocation
# batch_process_images(image_ids=[...])
Why this works well - Clear “task group” boundary: The helper function serves as your logical group without special framework features. - Concurrency control: - Local: ThreadPoolTaskRunner(max_workers=...) caps concurrent task execution inside the flow run (good for IO-bound tasks). - Server-side: Apply a single tag (e.g., "external-api") to all steps that hit your external service, then create a global concurrency limit for that tag (e.g., 20) so only N tasks across all flows run at once. - Optional: Define rate limits if you must throttle requests per second/minute. - Robustness: retries and timeouts on the polling task. When to use subflows instead - If you want each image’s pipeline to appear as its own subflow run (for visibility, ownership, or independent retries), decorate
process_one_image
as
@flow
and call it for each image. Trade-off: more orchestration overhead. Fan-out alternatives - Mapping: you can also express fan-out with mapping and the
unmapped
utility if you prefer that style for pure per-item tasks. Many teams find
.submit
in list comprehensions more explicit and flexible for multi-step per-item pipelines. Key docs and guides - Task runners (local concurrency): Task runners - Global concurrency limits by tag (server-side): Global concurrency limits
- Rate limits: Rate limits - Mapping/fan-out background: Task mapping scales - Workflow design patterns (subflows vs. composition): Workflow design patterns If you share your target concurrency for the external processor (e.g., max concurrent jobs and/or request/second), I can suggest concrete tag limits and task runner settings.