KG
08/25/2025, 3:07 PMMarvin
08/25/2025, 3:07 PMMarvin
08/25/2025, 3:13 PMfrom prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner
import time
# IO-bound: add a tag used for server-side concurrency limiting (see docs link below)
@task(tags={"external-api"})
def retrieve(image_id: str) -> bytes:
# fetch from storage
...
@task(tags={"external-api"})
def start_processing(image_bytes: bytes) -> str:
# submit job to external service, return job_id
...
# Retries help tolerate transient API errors; the task itself polls with sleeps/backoff.
@task(retries=5, retry_delay_seconds=10, tags={"external-api"})
def poll_until_done(job_id: str) -> dict:
deadline = time.time() + 60 * 15 # e.g., 15m timeout
while time.time() < deadline:
status = ... # call external service
if status["done"]:
return status["result"]
time.sleep(3) # simple backoff; tune to your SLA
raise TimeoutError(f"Job {job_id} did not complete in time")
@task(tags={"delivery"})
def send_somewhere(result: dict) -> None:
# publish/store/etc.
...
# Helper that “groups” the per-image steps by wiring futures
def process_one_image(image_id: str):
img = retrieve.submit(image_id)
job = start_processing.submit(img)
result = poll_until_done.submit(job)
return send_somewhere.submit(result)
@flow(task_runner=ThreadPoolTaskRunner(max_workers=50))
def batch_process_images(image_ids: list[str]) -> None:
futures = [process_one_image(i) for i in image_ids]
# Block until done (optional; remove if you want the flow to finish early)
for f in futures:
f.result()
# Example invocation
# batch_process_images(image_ids=[...])
Why this works well
- Clear “task group” boundary: The helper function serves as your logical group without special framework features.
- Concurrency control:
- Local: ThreadPoolTaskRunner(max_workers=...) caps concurrent task execution inside the flow run (good for IO-bound tasks).
- Server-side: Apply a single tag (e.g., "external-api") to all steps that hit your external service, then create a global concurrency limit for that tag (e.g., 20) so only N tasks across all flows run at once.
- Optional: Define rate limits if you must throttle requests per second/minute.
- Robustness: retries and timeouts on the polling task.
When to use subflows instead
- If you want each image’s pipeline to appear as its own subflow run (for visibility, ownership, or independent retries), decorate process_one_image
as @flow
and call it for each image. Trade-off: more orchestration overhead.
Fan-out alternatives
- Mapping: you can also express fan-out with mapping and the unmapped
utility if you prefer that style for pure per-item tasks. Many teams find .submit
in list comprehensions more explicit and flexible for multi-step per-item pipelines.
Key docs and guides
- Task runners (local concurrency): Task runners
- Global concurrency limits by tag (server-side): Global concurrency limitsMarvin
08/25/2025, 3:13 PM