David Wood
09/15/2025, 12:25 PMMarvin
09/15/2025, 12:25 PMMarvin
09/15/2025, 12:25 PMflow.from_source(...).deploy(...)
instead of 2.x build commands
Fire away with your questions!David Wood
09/15/2025, 12:27 PMMarvin
09/15/2025, 12:28 PMMarvin
09/15/2025, 12:35 PMfrom prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner
from prefect import futures
@task
def fetch(i):
# call API, read a file, DB query, etc.
return i * 2
@flow(task_runner=ThreadPoolTaskRunner(max_workers=20))
def my_flow(n: int = 100):
futs = [fetch(i) for i in range(n)] # scheduling tasks creates PrefectFutures
# optionally stream results as they complete
for f in futures.as_completed(futs):
_ = f.result() # handle result
# or wait for all at once
results = [f.result() for f in futs]
return results
CPU-bound variant with processes
from prefect import flow, task
from prefect.task_runners import ProcessPoolTaskRunner
@task
def heavy_compute(i):
# pure CPU work
return i ** 2
@flow(task_runner=ProcessPoolTaskRunner(max_workers=8))
def my_cpu_flow(n: int = 1000):
futs = [heavy_compute(i) for i in range(n)]
return [f.result() for f in futs]
Notes
- Max concurrency per flow is bounded by max_workers
.
- For ProcessPoolTaskRunner, tasks must be importable at module top-level and arguments/results must be pickleable.
2) Parallelize across flow runs (horizontal scale)
- Best for: running many independent flow runs (e.g., per-partition, per-customer, per-day) in parallel across machines/containers.
- How:
- Create a deployment for your flow and a work pool; run one or more workers that pull work.
- Launch multiple flow runs (via schedule, automation, or programmatically).
- Scale by running more workers and/or increasing work pool concurrency.
Useful CLI we verified exists
- Work pool concurrency (limit how many flow runs run at once):
- Set limit: prefect work-pool set-concurrency-limit <pool-name> <limit>
- Clear limit: prefect work-pool clear-concurrency-limit <pool-name>
- Deploy flows: prefect deploy --help
3) Throttle shared resources with central concurrency limits
- Best for: preventing overload of downstreams (APIs, DBs) across all flows and workers.
- Tag your tasks and set a concurrency limit for that tag. Prefect will enforce slots globally.
from prefect import task
@task(tags=["db"])
def write_to_db(row):
...
Create a tag limit with the CLI
- Create: prefect concurrency-limit create db --limit 5
- Inspect: prefect concurrency-limit inspect db
- List: prefect concurrency-limit ls
- Reset/delete: prefect concurrency-limit reset db
, prefect concurrency-limit delete db
Optional: fine-grained in-code guards
- Use the concurrency context manager inside a task to gate a critical section (useful when you can’t tag the whole task):
from prefect.concurrency.sync import concurrency
@task
def call_api_safe(i):
with concurrency("external-api", occupy=1, timeout_seconds=60):
# only N of these sections run at once (based on the configured limit)
...
Marvin
09/15/2025, 12:35 PMDavid Wood
09/15/2025, 1:23 PMMarvin
09/15/2025, 1:23 PMMarvin
09/15/2025, 1:26 PM# pip install "prefect-dask dask[distributed]"
from prefect import flow, task
from prefect_dask.task_runners import DaskTaskRunner
@task
def do_work(i):
return i * 2
# connect to an existing Dask scheduler (e.g., K8s, ECS, VM)
@flow(task_runner=DaskTaskRunner(address="<tcp://dask-scheduler:8786>"))
def dist_flow(n: int = 100):
futs = [do_work(i) for i in range(n)]
return [f.result() for f in futs]
Ray example
# pip install "prefect-ray ray"
from prefect import flow, task
from prefect_ray.task_runners import RayTaskRunner
from prefect_ray.context import remote_options
@task
def do_work(i):
return i * 2
# connect to Ray head (e.g., ray://<head-svc>:10001) or start local Ray if address=None
@flow(task_runner=RayTaskRunner(address="<ray://ray-head:10001>"))
def dist_flow(n: int = 100):
with remote_options(num_cpus=1): # optional per-task resources
futs = [do_work(i) for i in range(n)]
return [f.result() for f in futs]
Considerations
- Code and environment: cluster workers must have the same code and Python deps (use a common container image for Dask/Ray workers).
- Serialization: task args/returns must be serializable; keep payloads small. Use shared storage for large data and pass references.
- Networking: your Prefect worker/container must reach the Dask scheduler or Ray head address.
- Orchestration features still apply: retries, tagging, and Prefect’s global concurrency limits will gate task starts even on distributed runners.
- Subflows: distributed runners dispatch tasks, not flows. Subflows still execute in the main flow process unless you run them as separate flow runs.
Docs
- Dask integration: Prefect Dask
- Ray integration: Prefect Ray
- Task runners overview: Task runners
- Concurrency limits (useful when distributing): Concurrency limits
If you share where you’re running (Docker, Kubernetes, etc.) and whether you already have Dask or Ray available, I can suggest a concrete setup (e.g., Dask on Kubernetes vs. Ray on Kubernetes, addressing, and container images).David Wood
09/15/2025, 1:28 PMMarvin
09/15/2025, 1:28 PMMarvin
09/15/2025, 1:29 PMDavid Wood
09/15/2025, 1:30 PMMarvin
09/15/2025, 1:30 PMMarvin
09/15/2025, 1:34 PMDavid Wood
09/15/2025, 1:39 PMMarvin
09/15/2025, 1:40 PMMarvin
09/15/2025, 1:45 PMfrom prefect.workers.process import ProcessWorker
worker = ProcessWorker(
work_pool_name="my-pool",
limit=2, # max concurrent flow runs on this worker
)
worker.start()
- CLI: the worker start command accepts a per-worker concurrency limit (check prefect worker start --help
in your environment). It maps to the same limit
you see in the worker classes.
- Per work pool
- Cap concurrency across all workers in the pool:
prefect work-pool set-concurrency-limit "my-pool" 10
This prevents more than 10 flow runs from starting at once, regardless of how many workers are running.
Link: Work pools
Notes by worker type
- Process worker: runs flows locally in subprocesses; the per-worker limit
is the number of simultaneous subprocesses.
- Docker/Kubernetes/ECS workers: launch remote jobs (containers/pods/tasks). The per-worker limit
controls how many flow runs the worker will launch/manage at once; memory is consumed in the launched jobs, not by the worker process itself.
Autoscaling based on memory
- Not natively handled by Prefect. Recommended approaches:
- Kubernetes: use an HPA to scale worker replicas on CPU/memory, and set a work-pool concurrency limit to cap global throughput. Also assign resource requests/limits to flow-run pods in your job template so the scheduler enforces memory per run.
- Other platforms (ECS/VMs): use your platform’s autoscaling on service/container memory/CPU; adjust worker counts and optionally change work-pool concurrency via automation.
- If you want “memory-aware” concurrency, run a small controller that observes memory usage and updates the work pool’s concurrency with:
prefect work-pool set-concurrency-limit "my-pool" <new-limit>
or via the Prefect API.
Related docs
- Work pools and workers: Work pools
- Global concurrency limits: Global concurrency limits
- Kubernetes integration: Prefect Kubernetes
If you share which worker type you’re using (process, docker, kubernetes, ECS) and where you run it, I can give a concrete configuration and autoscaling pattern.David Wood
09/15/2025, 1:46 PMMarvin
09/15/2025, 1:46 PMMarvin
09/15/2025, 1:49 PMfrom prefect import flow, get_run_logger
from prefect.flow_runs import suspend_flow_run, resume_flow_run # resume usually triggered externally
def token_available() -> bool:
# your global token gate check
return False
@flow
def api_flow():
log = get_run_logger()
if not token_available():
<http://log.info|log.info>("Capacity not available — suspending run to free worker.")
suspend_flow_run(timeout=3600) # stop infra; worker slot freed
# execution will resume here when resumed
<http://log.info|log.info>("Proceeding with API call...")
# call API now
How to resume
- Automation: Create an Automation that resumes suspended runs when your “capacity available” condition is met (e.g., webhook, time window, or custom signal).
- Manual: Use the UI or prefect flow-run resume <id>
.
Option B: Use global concurrency/rate limits to gate starts
- Tag the tasks/flows that hit the API and set a global concurrency limit so only N run at once across your whole workspace. Prefect will queue runs and won’t start more than N—preventing extra runs from starting at all.
- This avoids starting runs that would just wait, but if your flow starts and then hits a long wait mid-run, Option A is better.
CLI for limits (verified)
- Create tag limit: prefect concurrency-limit create external-api --limit 2
- Inspect: prefect concurrency-limit inspect external-api
- List: prefect concurrency-limit ls
Tag your task
from prefect import task
@task(tags=["external-api"])
def call_api():
...
Option C: Explicit concurrency guard + suspend
- If you need to start the run (e.g., parameterized schedule) but want to immediately release the slot when no capacity:
- Attempt to acquire capacity (your own token service or Prefect concurrency context).
- If not acquired promptly, call suspend_flow_run()
and exit.
Notes and trade-offs
- Suspend vs Pause: suspend_flow_run ends the infra/worker process and truly frees the slot; pause_flow_run generally keeps the run “waiting” but is better for short waits. For long or unpredictable waits, use suspend.
- Resumption strategy is key: pair suspension with an Automation or small controller that resumes when your token service says “go.”
- If you can predict a window (e.g., “after 5 minutes”), consider scheduling resumption with an Automation instead of constant polling.
Related links
- Concurrency limits: Concurrency limits
- Rate limits (concept): Rate limits
- Flow suspension/resume reference: Server concepts (and prefect.flow_runs.suspend_flow_run
in API)
If you share how you determine “token available” (own service, third-party API headers, etc.), I can sketch the Automation to resume runs and the minimal code needed to wire this up.David Wood
09/15/2025, 2:01 PMMarvin
09/15/2025, 2:01 PMMarvin
09/15/2025, 2:02 PM# Set once (CLI)
prefect concurrency-limit create external-api 3 # at most 3 concurrent calls
# In code
from prefect import task
@task(tags=["external-api"])
def call_external_api(payload):
...
2) Global rate limit (tokens per second/minute)
- For true request rate control, create a global concurrency limit with slot decay (token bucket), or use the rate_limit context to pace calls.
- CLI for token-bucket style limit (verified):
prefect global-concurrency-limit create external-api-rps --limit 10 --slot-decay-per-second 10
# Allows ~10 tokens per second across all tasks that consume this limit name
- Consume tokens in code when making a call:
from prefect.concurrency.sync import rate_limit
def call_with_rate_limit(...):
with rate_limit("external-api-rps", occupy=1, timeout_seconds=30):
# make exactly one request here
...
- Combine with the tag-based limit to bound bursts and smooth sustained rate:
- Tag enforces “at most N simultaneous requests”
- rate_limit enforces “no more than R requests per second”
Putting it together
- Simple, concurrency-only:
- Use only the task tag + prefect concurrency-limit create external-api N
- Concurrency + RPS:
- Tag the task with “external-api” to cap simultaneous calls
- Wrap each request in rate_limit("external-api-rps", occupy=1)
for RPS control
- This does not cap how many flow runs start; it just gates the API call sections.
Avoid holding worker slots when waiting
- If a flow reaches the API section and must wait a long time (e.g., external token window), call suspend_flow_run()
before the API step and resume later when your window opens. This frees the worker slot entirely while preserving flow progress.
Handy links
- Tag-based limits: Concurrency limits
- Global limits (token bucket, slot decay): Global concurrency limits
- Rate limits concept: Rate limits
If you share your target policy (e.g., “max 5 concurrent, 20 req/s sustained, burst 50”), I can suggest concrete CLI values and code snippets.David Wood
09/15/2025, 2:08 PMMarvin
09/15/2025, 2:08 PMMarvin
09/15/2025, 2:10 PMprefect flow-run resume <flow_run_id>
- Automations: set an Automation that listens for an external signal or time window and calls “Resume Flow Run” when your condition is met (e.g., your token service posts a webhook, or a cron window opens).
- Timeout behavior: if you used a timeout with suspend_flow_run, you can detect expiry via Automations and either resume, cancel, or notify.
Scaling considerations for 100k suspended runs
- DB hygiene: this is primarily a database scaling question.
- Use a managed Postgres with sufficient storage/IOPS.
- Index on flow_run.state_type, updated timestamps, etc., to list/filter Suspended efficiently.
- Prune old state history/logs if retention policies allow.
- For very large installations, consider table partitioning for high-churn tables (logs, states).
- Resume fan-out control: when it’s time to resume many runs, throttle resumption to avoid a sudden thundering herd.
- Use work-pool concurrency limits to cap inflight resumes.
- Batch resumes in waves (Automations or a small controller service) and/or stagger with jitter.
- Observability: emit events/metrics on suspends/resumes so you can monitor queue depth and resume rates.
When to prefer suspend vs. gating starts
- Long, unpredictable waits: suspend_flow_run is ideal—no resources held.
- Short waits or simple throughput control: rely on tag/global concurrency limits or rate limits to keep runs queued or throttled before they begin the API call section.
If you share your expected suspended-run volume and resume burst profile (e.g., “resume up to 500/min”), I can suggest concrete indexes, Automation patterns, and resume batching code.David Wood
09/15/2025, 2:14 PMMarvin
09/15/2025, 2:14 PMMarvin
09/15/2025, 2:15 PMprefect concurrency-limit create external-api 5
Then tag your task:
from prefect import task
@task(tags=["external-api"])
def call_api(...):
...
Tasks beyond 5 will queue in AwaitingConcurrencySlot; flows continue to run but will be blocked at that task.
- Token-bucket rate limit (req/s) with global-concurrency-limit:
prefect global-concurrency-limit create external-api-rps --limit 10 --slot-decay-per-second 10
In code, consume tokens:
from prefect.concurrency.sync import rate_limit
def call_with_rps(...):
with rate_limit("external-api-rps", occupy=1, timeout_seconds=2):
# make exactly one request here
...
- Caveat: this “queue” is orchestration-level; the flow run stays active while waiting for a slot. If you need to free the worker/infrastructure, combine with suspension (below).
2) Free the worker while queued (decoupled, still centralized)
- Try-acquire-then-suspend pattern:
- Attempt to acquire a token/slot quickly; if it times out, suspend the flow so it doesn’t hold a worker slot. Resume later via an Automation or webhook.
from prefect import flow, get_run_logger
from prefect.concurrency.sync import rate_limit
from prefect.flow_runs import suspend_flow_run
@flow
def my_flow(payload):
log = get_run_logger()
try:
with rate_limit("external-api-rps", occupy=1, timeout_seconds=0.25):
return do_request(payload)
except Exception:
<http://log.info|log.info>("No token available; suspending to free resources.")
suspend_flow_run(timeout=900)
# On resume, try again:
with rate_limit("external-api-rps", occupy=1, timeout_seconds=2):
return do_request(payload)
- Resume strategy: use a scheduled Automation (resume N/s) or a small controller that resumes suspended runs when tokens are available to avoid a thundering herd.
3) Strict FIFO/durable queue (when ordering/durability matters)
- Use an external queue (SQS, Pub/Sub, Redis) as the single “API gateway” queue.
- Main flows enqueue requests and optionally suspend.
- A dedicated Prefect deployment consumes the queue at controlled concurrency/RPS (using the same Prefect limits), and notifies or resumes requestors on completion.
- This gives you real FIFO and decouples waiting entirely from the main flows.
Which to pick
- Need simple global throttle and don’t mind flows idling: use tag-based/global limits.
- Need to release worker resources during long waits: try-acquire-then-suspend + resume.
- Need strict FIFO and durable queuing: external message queue + a dedicated consumer deployment.
Docs
- Concurrency limits: Concurrency limits
- Global limits (token bucket): Global concurrency limits
- Rate limits concept: Rate limits
If you share your exact policy (e.g., “burst 20, sustained 5 r/s” and whether you must free workers), I can sketch a concrete config (limits + Automations) or an SQS-based pattern.David Wood
09/15/2025, 2:18 PMMarvin
09/15/2025, 2:18 PMMarvin
09/15/2025, 2:34 PMfrom prefect.events.utilities import emit_event
emit_event(
event="external.api.capacity.available",
resource={"prefect.resource.id": "global/external-api"}, # your identifier
payload={"slots": 5} # optional, can be used by automations for filtering
)
2) Window-based drain
- Create an Automation that runs on a schedule or when flows have been Suspended for > X minutes. Action: “Resume Flow Run”. Useful for retry windows or quiet periods.
3) Try-acquire-then-suspend loop
- In your flow, attempt to acquire a rate-limit token quickly. If unavailable, suspend_flow_run(timeout=...)
to free infra.
- An Automation resumes runs when capacity returns.
from prefect import flow, get_run_logger
from prefect.concurrency.sync import rate_limit
from prefect.flow_runs import suspend_flow_run
@flow
def api_flow():
log = get_run_logger()
try:
with rate_limit("external-api-rps", occupy=1, timeout_seconds=0.2):
return make_request()
except Exception:
<http://log.info|log.info>("No token; suspending to free worker")
suspend_flow_run(timeout=900) # resume later via Automation
with rate_limit("external-api-rps", occupy=1, timeout_seconds=2):
return make_request()
How resumption works
- The “Resume Flow Run” action transitions the run out of Suspended. Workers will pick it up and the flow continues exactly after the suspension point.
- If you used suspend_flow_run(wait_for_input=...)
, a programmatic resume can pass `run_input`:
from prefect.flow_runs import resume_flow_run
resume_flow_run(flow_run_id, run_input={"token": "..."} )
- In the UI, manual resume can include input; Automations can also pass input derived from event payloads.
Targeting and avoiding thundering herds
- Filter the Automation to only resume runs with specific tags (e.g., external-api
), flows, or deployments.
- Combine with work pool concurrency limits to cap how many resumes turn into active runs at once.
- If you need strict “resume N per minute,” use a small controller (cron or webhook handler) that resumes in batches using the API.
Setup in the UI (typical)
- Automations → New Automation
- Trigger: “When a flow run enters state = Suspended” (optionally AND “has tag = external-api”)
- Action: “Resume Flow Run” (optionally add a delay or additional actions like Send Notification)
- Or Trigger: “When event = external.api.capacity.available” and Action: “Resume Flow Run” with filters to target relevant runs
Links and references
- Blog overview of Automations and events: Automations and observability
- Emitting custom events from code: see prefect.events.utilities.emit_event
- Resuming flows programmatically: see prefect.flow_runs.resume_flow_run
If you describe how your “capacity available” signal is produced (webhook, queue depth, token service), I can sketch a concrete Automation config and, if needed, a minimal controller to resume in batches.David Wood
09/15/2025, 3:53 PMMarvin
09/15/2025, 3:53 PMMarvin
09/15/2025, 3:55 PMsuspend_flow_run(timeout=...)
. Tag the run (e.g., external-api
) to identify it.
2) Create a “drain” process:
- Either an Automation that triggers on a periodic schedule or an external controller (tiny script/deployment).
- It lists Suspended runs with your tag, sorts FIFO (by created/updated time), and resumes a bounded batch each tick to avoid thundering herds.
- Work pool concurrency limits ensure only N resume into running at once.
- Minimal controller sketch:
from prefect.client.orchestration import get_client
from prefect.flow_runs import resume_flow_run
import asyncio
BATCH = 20 # resume up to 20 per tick
TAG = "external-api"
async def drain_once():
async with get_client() as client:
# Query suspended runs with the tag; adjust filters as needed
runs = await client.read_flow_runs(
flow_run_filter={
"states": {"type": {"any_": ["SUSPENDED"]}},
"tags": {"all_": [TAG]},
},
limit=BATCH,
sort="CREATED" # or "EXPECTED_START_TIME"/"UPDATED"
)
for fr in runs:
await resume_flow_run(fr.id)
if __name__ == "__main__":
asyncio.run(drain_once())
- Run it on a schedule (cron, Cloud Run, K8s CronJob) or behind a webhook.
Option 2: Event-driven resume (still FIFO-ish)
- Emit a custom event when capacity tokens become available. An Automation listens and resumes up to N suspended runs each time.
- You can chain Automations or a small service to ensure you don’t resume more than your allowed rate/concurrency.
Option 3: True FIFO with an external queue
- If strict ordering and durability matter, put requests into SQS/Pub/Sub/Redis.
- Your flows enqueue and suspend. A dedicated Prefect consumer deployment drains the queue at your desired rate/concurrency and resumes or notifies when done.
- This separates fairness strictly from flow scheduling and gives you battle-tested queue semantics.
Keep other flows from hogging capacity
- Use a work-pool concurrency limit for the consumer pool to cap how many resumes become active at once.
- Combine with tag-based limits or rate_limit tokens on the actual API call to enforce throughput.
- The controller/Automation enforces fairness by draining FIFO and in bounded batches.
Why this works
- Suspended runs are just DB rows; they don’t consume worker resources.
- Your “queue” is the set of Suspended runs filtered by a tag; the controller gives you ordering and fairness.
- Resuming in batches + pool concurrency prevents stampedes while ensuring steady progress.
If you tell me your target policy (e.g., FIFO, resume 50/minute, max 5 in-flight), and where you run (Cloud/self-hosted, K8s/etc.), I’ll tailor the exact Automation settings, filters, and a production-ready drain script.David Wood
09/15/2025, 4:01 PMMarvin
09/15/2025, 4:02 PMMarvin
09/15/2025, 4:03 PMprefect global-concurrency-limit inspect external-api-rps
- Or have your token service emit a custom event with the available count; Automations can react to that event and call a webhook or run a small controller.
- Can it be triggered when something is added to the suspend queue?
- Yes. You can trigger on “flow run entered Suspended state” (and filter by tag/flow/deployment). Typically, you’ll use that signal to wake a controller or increment a metric, then let the controller drain fairly at your chosen rate to avoid stampedes.
- Overhead for suspending and restarting a flow
- Suspension is a state transition persisted in Prefect’s DB; the running infrastructure stops. Overhead is small (DB write + API work).
- Resumption schedules the run again. Actual latency/overhead is dominated by your infrastructure:
- Process worker: typically sub-second to a couple of seconds to re-exec the flow process.
- Docker/Kubernetes/ECS: container/pod/task startup (seconds to tens of seconds).
- Pickup latency also depends on worker polling config. You can lower worker prefetch/poll intervals for faster pickup, but keep them reasonable to avoid heavy API load.
Recommended pattern for a fair, global queue
- Suspend when you can’t get a token, then drain FIFO in bounded batches based on available tokens. Example controller (runs every 500ms if you like):
import asyncio
from prefect.client.orchestration import get_client
from prefect.flow_runs import resume_flow_run
TAG = "external-api"
GCL_NAME = "external-api-rps"
MAX_RESUMES_PER_TICK = 10
SLEEP_SECONDS = 0.5 # 500ms
async def available_tokens(client, gcl_name):
gcls = await client.read_global_concurrency_limits(name=gcl_name)
if not gcls:
return 0
gcl = gcls[0]
# available = limit - active_slots (rounded for safety)
return max(0, int(gcl.limit - (gcl.active_slots or 0)))
async def drain_once():
async with get_client() as client:
tokens = await available_tokens(client, GCL_NAME)
budget = min(tokens, MAX_RESUMES_PER_TICK)
if budget <= 0:
return
runs = await client.read_flow_runs(
flow_run_filter={
"states": {"type": {"any_": ["SUSPENDED"]}},
"tags": {"all_": [TAG]},
},
limit=budget,
sort="CREATED" # FIFO-ish
)
for fr in runs:
await resume_flow_run(fr.id)
async def main():
while True:
try:
await drain_once()
finally:
await asyncio.sleep(SLEEP_SECONDS)
if __name__ == "__main__":
asyncio.run(main())
Key controls
- Use a tag (e.g., external-api
) on flows/tasks that should join the queue.
- Use a global concurrency limit or token-bucket GCL for your API (e.g., external-api-rps
) to represent capacity.
- Work-pool concurrency limits cap how many resumes can become active at once, preventing other flows from hogging capacity.
Helpful references
- Concurrency limits: Concurrency limits
- Global limits (token bucket): Global concurrency limitsMarvin
09/15/2025, 4:03 PMDavid Wood
09/15/2025, 4:03 PMMarvin
09/15/2025, 4:07 PMMarvin
09/15/2025, 4:07 PMprefect server start
- Don’t use --no-services
if you want background services (scheduler, etc.) active.
- Cloud: Automations run in Prefect Cloud’s control plane automatically.
- No-server mode: there’s no “standalone automation” process without a server. For sub-second control or custom logic, run a small controller script that uses the Prefect API; that can run locally anywhere.
When to use which
- If you already use Prefect Cloud: configure Automations in the UI and you’re done.
- If you self-host: run Prefect Server (Docker/K8s/VM or even on your laptop) and define Automations there.
- If you need 500ms cadence or custom fairness: a tiny controller service that calls the API (resume in batches, check available tokens) complements or replaces Automations for that specific loop.
References
- Server concepts and self-hosting: Prefect Server concepts and Self-hosted Prefect
- Automations overview (blog): Automations and observability
Are you on Prefect Cloud or planning to run Prefect Server locally/on your own infra? I can outline the exact setup for your case.David Wood
09/15/2025, 4:31 PMMarvin
09/15/2025, 4:32 PMMarvin
09/15/2025, 4:33 PMfrom prefect import task
from prefect.deployments.flow_runs import run_deployment
@task
def call_external_api(...):
# ... do the API work ...
# fire-and-forget a drain run (as a separate flow run)
run_deployment(
name="drain-controller/drain",
parameters={"budget": 10}, # how many to resume per tick
as_subflow=False
)
Drain flow (sketch):
from prefect import flow, get_run_logger
from prefect.concurrency.sync import concurrency
from prefect.client.orchestration import get_client
from prefect.flow_runs import resume_flow_run
import asyncio
TAG = "external-api" # tag you applied to queued runs
GCL_NAME = "external-api-rps" # token-bucket global limit name
@flow
def drain(budget: int = 10):
log = get_run_logger()
# distributed mutex to prevent multiple drains running concurrently
with concurrency("resume-controller", occupy=1, timeout_seconds=0):
asyncio.run(_drain_async(budget))
<http://log.info|log.info>("Drain complete")
async def _drain_async(budget: int):
async with get_client() as client:
# Optionally compute available tokens from a global limit
gcls = await client.read_global_concurrency_limits(name=GCL_NAME)
if gcls:
available = max(0, int(gcls[0].limit - (gcls[0].active_slots or 0)))
budget = min(budget, available) if available else 0
if budget <= 0:
return
# FIFO-ish: fetch the oldest suspended runs for our tag
runs = await client.read_flow_runs(
flow_run_filter={
"states": {"type": {"any_": ["SUSPENDED"]}},
"tags": {"all_": [TAG]},
},
limit=budget,
sort="CREATED",
)
for fr in runs:
await resume_flow_run(fr.id)
Why this works
- Your task stays simple; the drain logic is centralized, fair, and capped.
- The global mutex (concurrency("resume-controller")
with a limit of 1) avoids multiple drains racing when many tasks finish at once.
- You can tie the “budget” to available tokens from a global limit or keep it fixed.
Option B: Emit an event, let an Automation run the drain
- At the end of the API task, emit a custom event (e.g., “external.api.capacity.released”).
- An Automation listens for that event and runs the “drain-controller” deployment. This decouples your task from orchestration specifics.
In your task:
from prefect import task
from prefect.events.utilities import emit_event
@task
def call_external_api(...):
# ... do API work ...
emit_event(
event="external.api.capacity.released",
resource={"prefect.resource.id": "global/external-api"},
payload={"hint": "ok"}, # optional
)
Automation
- Trigger: event = external.api.capacity.released
- Action: Run Deployment (drain-controller/drain) with parameters
Notes and guardrails
- Keep the drain batch small and frequent rather than large and rare to avoid stampedes.
- Combine with work-pool concurrency limits to cap how many resumes become active at once.
- If you use a token-bucket global limit (slot decay), the “available tokens” check lets you resume only what the system can sustainably process.
Docs
- Global concurrency limits (token bucket): Global concurrency limits
- Concurrency locks in code: Concurrency limitsMarvin
09/15/2025, 4:33 PMDavid Wood
09/15/2025, 4:33 PMMarvin
09/15/2025, 4:33 PMMarvin
09/15/2025, 4:34 PMprefect work-queue create ...
- prefect work-queue ls
- prefect work-queue inspect <id>
- prefect work-queue pause <id>
/ prefect work-queue resume <id>
- prefect work-queue preview <id>
- Per-queue concurrency limit:
- prefect work-queue set-concurrency-limit <id> <limit>
- prefect work-queue clear-concurrency-limit <id>
Important notes
- These are orchestration queues, not a general-purpose FIFO like SQS. Prefect schedules/prioritizes runs (typically by scheduled time/priority) and stores state durably in its database.
- A deployment targets a work pool (and optionally a work queue); workers started for that pool/queue will pick up runs.
- Use pool/queue concurrency to control how many flow runs start at once; use task/tag/global limits to throttle shared resources within runs.
Docs
- Work pools and routing overview: Work pools
- CLI reference (queues): run prefect work-queue --help
for the full command set in your environment.
If you tell me your routing goals (e.g., split by environment, priority tiers), I can suggest a pool/queue layout and how to target them from deployments.David Wood
09/15/2025, 7:43 PMMarvin
09/15/2025, 7:43 PMMarvin
09/15/2025, 7:43 PMfrom prefect import flow, task, futures
from prefect.task_runners import ThreadPoolTaskRunner
@task(retries=3, retry_delay_seconds=5, persist_result=False)
def process_row(row):
# call APIs, transform, write out
return row["id"]
def chunk(iterable, size):
buf = []
for x in iterable:
buf.append(x)
if len(buf) == size:
yield buf
buf = []
if buf:
yield buf
@flow(task_runner=ThreadPoolTaskRunner(max_workers=64))
def pipeline(rows, batch_size: int = 5_000):
for batch in chunk(rows, batch_size):
futs = [process_row(r) for r in batch]
for f in futures.as_completed(futs):
_ = f.result() # handle/log result
- Partitioned flows (when total items are huge)
- Create a deployment and launch many flow runs, each responsible for a partition (e.g., per file/day/shard/customer) of, say, 10k–100k rows. Inside each run, use task-level parallelism as above.
- This keeps orchestration objects to thousands of flow runs, not millions.
- Streaming/consumer pattern (when rows arrive continuously)
- Use an external queue (SQS/Pub/Sub/Kafka) and run a consumer flow/deployment that pulls messages and processes with controlled concurrency. This decouples producers from processing and avoids spawning a flow run per row.
Why avoid a flow-per-row
- Startup/pickup latency per flow run adds up.
- Massive DB pressure (run/state/log rows).
- Harder to observe and manage millions of runs.
- Cost/throughput inefficiency vs. batching.
Throughput and fairness controls
- Protect shared services: tag the API task and set a concurrency limit, and/or use a global rate limit (token bucket).
- For long waits (rate limits, tokens): try-acquire then suspend the flow run and resume later so you don’t hold worker slots.
- Use work-pool concurrency limits to cap how many flow runs execute at once; inside each flow, use task runners to parallelize efficiently.
When would per-row flows make sense?
- If each row must be a fully isolated, independently retryable, auditable unit with distinct SLAs and you’re only dealing with thousands (not millions). Even then, consider “micro-batching” (e.g., 100–1,000 rows per flow run) to control overhead.
Helpful links
- Task runners: Task runners
- Dask integration: Prefect Dask
- Ray integration: Prefect Ray
- Concurrency limits: Concurrency limits
If you can share:
- Approx rows, per-row cost (I/O vs CPU), and required throughput
- Whether you have a Dask/Ray cluster
- Any strict per-row audit/traceability needs
…I can suggest concrete batch sizes, runner choice, and concurrency settings.David Wood
09/15/2025, 8:51 PMMarvin
09/15/2025, 8:51 PMMarvin
09/15/2025, 8:54 PMimport os
import gc
import psutil
from prefect import flow, get_run_logger
def rss_mb():
return psutil.Process(os.getpid()).memory_info().rss / (1024 ** 2)
@flow
def measure_baseline():
logger = get_run_logger()
gc.collect()
<http://logger.info|logger.info>(f"RSS at flow start: {rss_mb():.1f} MB")
# Import your typical libraries to see the true baseline
# import pandas as pd; import numpy as np; ...
gc.collect()
<http://logger.info|logger.info>(f"RSS after imports: {rss_mb():.1f} MB")
if __name__ == "__main__":
measure_baseline()
Run a few of these concurrently to see aggregate usage. On Docker/K8s you can also compare with docker stats
or kubectl top pod
.
References:
- Manage work pools and workers (Prefect 3.x)
- Work pools and workers (blog overview)
- Prefect 3 GA notesDavid Wood
10/03/2025, 10:25 AMMarvin
10/03/2025, 10:25 AMMarvin
10/03/2025, 10:29 AMexport PREFECT_API_URL=<http://localhost:4200/api>
- PowerShell:
$env:PREFECT_API_URL = "<http://localhost:4200/api>"
Note on Docker networking:
- If your worker runs on your host machine, use http://localhost:4200/api.
- If your worker runs in another Docker container, use the Docker service name or container hostname, e.g. http://prefect-server:4200/api.
Docs: Manage Settings
Step 2 — Create a work pool
- Process worker (no containers, runs Python locally):
prefect work-pool create -t process process-pool
- Docker worker (runs each flow in a container):
prefect work-pool create -t docker docker-pool
Tip: For Docker worker, install extras: pip install "prefect[docker]"
Step 3 — Start a worker
- Process worker:
prefect worker start -p process-pool
- Docker worker:
prefect worker start -p docker-pool
Step 4 — Create a deployment for your flow
Option A: Python API (nice for quick starts)
Example flow file: flows/my_flow.py
from prefect import flow
@flow
def my_flow(name: str = "world"):
print(f"Hello, {name}!")
if __name__ == "__main__":
my_flow()
Process worker deployment (local code):
from prefect.flows import Flow
Flow.from_source(
".", # path to your repo or project root
"flows/my_flow.py:my_flow" # entrypoint: file_path:flow_name
).deploy(
name="hello-process",
work_pool_name="process-pool",
parameters={"name": "Marvin"},
)
Docker worker deployment (containerized):
from prefect.flows import Flow
Flow.from_source(
".",
"flows/my_flow.py:my_flow"
).deploy(
name="hello-docker",
work_pool_name="docker-pool",
image="your-docker-user/prefect-flows:latest", # must be pullable by the worker
build=True, # build an image from your project
push=True, # push image to your registry
parameters={"name": "Marvin"},
)
Option B: CLI via prefect.yaml
Initialize once:
prefect deploy init
Edit prefect.yaml to set your work pool (process-pool or docker-pool), entrypoint, and optionally an image/build stanza for Docker.
Then:
prefect deploy
Docs:
- prefect.yaml deployments
- Workers API reference
- Blog overview: Work pools and workers
Step 5 — Trigger a run
- From the UI: Navigate to your deployment and click Run.
- From the CLI:
prefect deployment run "my_flow/hello-process"
# or
prefect deployment run "my_flow/hello-docker"
Notes and common gotchas
- Ensure the worker environment has the same code and dependencies as your flow. For Process workers, the code must be available on disk; for Docker workers, the image must include your code and dependencies.
- If your server is in Docker Compose and your worker is in another Compose service, set PREFECT_API_URL to the service name (e.g., http://server:4200/api).
- If you see “No worker found…” or runs stay “Late,” your worker likely isn’t connected to the right work pool or cannot reach the API (PREFECT_API_URL).
Would you like to run with a Process worker or a Docker worker? Also, where will your worker run (host machine or another container)? I can tailor the exact commands and a minimal prefect.yaml for your setup.