roshan Kumar
09/07/2025, 7:25 AMMarvin
09/07/2025, 7:25 AMMarvin
09/07/2025, 7:26 AMMarvin
09/07/2025, 7:32 AMfrom prefect import flow, get_run_logger
from prefect.flow_runs import suspend_flow_run, resume_flow_run
def submit_job() -> str:
# call external API and return a job_id
...
def fetch_result(job_id: str):
...
def process(result):
...
@flow
def my_flow():
logger = get_run_logger()
job_id = submit_job()
<http://logger.info|logger.info>(f"Submitted job {job_id}; suspending for later resume")
# This sets the flow to a suspended/paused state and frees the worker
suspend_flow_run(timeout=600) # e.g., plan to resume within 10 minutes
# When resumed, continue here
result = fetch_result(job_id)
process(result)
Ways to resume:
- Manually from the UI, or via API using resume_flow_run(flow_run_id, run_input=...)
- Preferably via an Automation (time-based or event-based) in Prefect Cloud
Links:
- Interactive (pause/suspend) flows
- Flow run resume API
- Work pools and workers
2) Split into two flows (schedule the second and exit the first)
- Flow A: submit the API job, then schedule Flow B 10 minutes in the future and exit immediately.
- Flow B: fetch results and do the CPU-bound processing.
- This completely frees the worker during the “wait”.
Example:
from datetime import datetime, timedelta
from prefect import flow
from prefect.deployments import run_deployment
def submit_job() -> str:
...
@flow
def submit_flow():
job_id = submit_job()
# Schedule a follow-up run 10 minutes later
run_deployment(
name="process/results", # "<deployment-name>" of your processing flow
parameters={"job_id": job_id},
scheduled_time=datetime.utcnow() + timedelta(minutes=10),
)
# Done — frees the worker now
@flow
def process_flow(job_id: str):
result = fetch_result(job_id)
process(result)
Optimizing the CPU-bound part
- Inside the processing flow, use a process-based task runner for parallel CPU work:
from prefect import flow, task
from prefect.task_runners import ProcessPoolTaskRunner
@task
def heavy_compute(chunk):
...
@flow(task_runner=ProcessPoolTaskRunner())
def process_flow(job_id: str):
data = fetch_result(job_id)
# fan-out work across processes
results = [heavy_compute.submit(c) for c in data.chunks()]
What this gives you
- During the 10-minute wait, no worker slot is occupied (either because the flow is suspended or because the first flow has exited after scheduling the follow-up).
- Your worker can process far more than 10 “in-flight” jobs, since it’s not blocked by waits.
- You can route the CPU-bound processing to a separate work pool/worker with concurrency tuned to your machine’s cores, while keeping a different pool for submission/scheduling.
Notes
- Async/await for the API call can make it non-blocking within a single flow run, but the flow run still occupies a worker slot. To truly free worker capacity, use suspend/pause or the “split flows” pattern.Marvin
09/07/2025, 7:32 AMroshan Kumar
09/07/2025, 7:37 AMMarvin
09/07/2025, 7:37 AMMarvin
09/07/2025, 7:39 AMflow_run_id
in the job’s callback URL.
- Call suspend_flow_run(wait_for_input=...)
to put the flow in Suspended state and release the worker.
- When your external system finishes, your webhook handler calls send_input(...)
(or resume_flow_run(...)
) with the flow_run_id
to immediately resume the flow and continue.
Flow:
from pydantic import BaseModel
from prefect import flow, get_run_logger
from prefect.flow_runs import suspend_flow_run
from prefect.runtime import flow_run
# Typed input you expect when resuming
class JobReady(BaseModel):
payload_url: str
def submit_job(callback_url: str) -> str:
# Submit to external system, include callback_url; return job_id
...
def fetch_result(url: str):
...
def process(result):
...
@flow
def my_flow():
logger = get_run_logger()
fr_id = flow_run.id()
callback_url = f"<https://my-service/callback?flow_run_id={fr_id}>"
job_id = submit_job(callback_url=callback_url)
<http://logger.info|logger.info>(f"Submitted job {job_id}; suspending until callback")
# Free the worker until input arrives; set timeout=None for indefinite wait
ready: JobReady = suspend_flow_run(wait_for_input=JobReady, timeout=None)
# Resumes here when your webhook sends input
result = fetch_result(ready.payload_url)
process(result)
Webhook handler (runs in your API service):
from prefect.input import send_input
def handle_callback(flow_run_id: str, payload_url: str):
# This unblocks the suspended flow immediately
send_input(
run_input={"payload_url": payload_url},
flow_run_id=flow_run_id,
)
Notes:
- Get the current run id inside the flow via prefect.runtime.flow_run.id()
.
- Use a secret or signed token in the callback to prevent spoofing.
- You can set timeout
on suspend_flow_run
if you want a safety net (e.g., mark as failed/alert if input never arrives).
Links:
- Interactive (pause/suspend) flows
- Flow run resume API
- Work pools and workers
2) Split “submit” and “process” into separate flows (small polling windows)
If you can’t get a callback, run many short, non-blocking “check” runs:
- Flow A submits the job and schedules Flow B to run soon.
- Flow B checks readiness; if not ready, it quickly reschedules itself a bit later and exits (no long wait). When ready, it fetches and processes.
This uses brief runs instead of holding a slot for minutes, allowing your worker to cycle through more runs.
Parallelize the CPU work
Use a process pool to fan out CPU-bound tasks:
from prefect import flow, task
from prefect.task_runners import ProcessPoolTaskRunner
@task
def heavy_compute(chunk):
...
@flow(task_runner=ProcessPoolTaskRunner(max_workers=8)) # tune to your cores
def process_result(result):
chunks = chunk_result(result)
futs = [heavy_compute.submit(c) for c in chunks]
return [f.result() for f in futs]
Why this helps your concurrency
- Suspended/paused flows do not occupy a worker slot, so your worker can accept far more than 10 “in-flight” jobs even with a concurrency limit of 10.
- Use one work pool/worker for submission (high concurrency, lightweight) and another for processing (concurrency ≈ CPU cores) to avoid CPU contention.
If you share:
- whether you can add a webhook/callback from the external system,
- your worker machine core counts,Marvin
09/07/2025, 7:39 AMProcessPoolTaskRunner(max_workers=...)
.Marvin
09/07/2025, 7:43 AMsuspend_flow_run
transitions the run into a suspended state and frees the worker slot.
- After the timeout (or an explicit resume), the run is rescheduled and picked up again by a worker to continue.
- This allows your worker to process far more than 10 “in-flight” jobs because the waiting runs won’t consume concurrency slots.
Example:
from prefect import flow, get_run_logger
from prefect.flow import suspend_flow_run
def submit_job_to_api() -> str:
# Your API submission that returns a job_id
...
def fetch_results(job_id: str):
# Your API call to fetch/process results
...
def process_cpu_bound(payload):
# Your CPU-bound processing
...
@flow
def my_flow():
logger = get_run_logger()
job_id = submit_job_to_api()
<http://logger.info|logger.info>(f"Submitted job {job_id}; deferring for 10 minutes (frees worker slot)")
# Defer this run for 10 minutes; it will not occupy a worker while waiting
suspend_flow_run(timeout=600, key=f"wait-for-{job_id}")
# After timeout or explicit resume, the run is picked back up here
payload = fetch_results(job_id)
process_cpu_bound(payload)
- Docs: suspend_flow_run API
Advanced: If your external system can notify you when ready, you can resume earlier than the timeout using the API/UI and optionally pass input. (The run will consume a worker slot again only when it resumes.)
2) If you must poll until ready, use repeated deferrals
- Instead of time.sleep
, loop with small suspend_flow_run
timeouts to check status. Each suspend frees the worker between polls.
while True:
if check_status(job_id) == "ready":
break
suspend_flow_run(timeout=60, key=f"poll-{job_id}")
3) Consider splitting into two flows (optional architecture)
- Flow A: submit the job and exit (or suspend until ready).
- Flow B: do CPU-bound processing when the job is ready (triggered by an automation or an explicit resume/`run_deployment`).
- This lets you place CPU-heavy work into a separate work pool with its own (smaller) concurrency while allowing many more “waiting” jobs to exist without consuming slots.
Other notes
- Simply making the API call “async” won’t help with work-pool concurrency. An async await asyncio.sleep(600)
still occupies a flow-run slot. Deferral via suspend_flow_run
is what frees the slot.
- If you truly need more actively running flows (not just waiting), increase the work pool concurrency or add more workers. But for long waits, deferral scales better.
- You can also add global concurrency limits for the CPU-bound step if you need to protect resources: Global concurrency limits
Would you prefer a time-based resume (e.g., after 10 minutes) or can your external system call back to resume the flow when the job is ready? I can share a snippet for resuming via the Prefect API if that helps.roshan Kumar
09/07/2025, 7:46 AMMarvin
09/07/2025, 7:46 AMMarvin
09/07/2025, 7:47 AMjob_id
.
- This gives you near-zero latency when the job is ready (1 second or 10 minutes), and your work-pool concurrency isn’t consumed while waiting.
Flow to submit:
from prefect import flow, get_run_logger
def submit_job_to_api() -> str:
# submit to external system and return job_id
...
@flow
def submit_flow():
logger = get_run_logger()
job_id = submit_job_to_api()
<http://logger.info|logger.info>(f"Submitted external job: {job_id}")
# done — worker slot is immediately free
Webhook handler (your API service):
from prefect.deployments import run_deployment
def handle_external_callback(job_id: str):
# Trigger process flow immediately when the job is ready
run_deployment(
name="process/results", # your processing flow's deployment name
parameters={"job_id": job_id},
)
- Make sure the service that runs handle_external_callback
has PREFECT_API_URL
set to your self-hosted Prefect API.
- You can also use the REST API if you prefer HTTP over Python.
Option B: Self-rescheduling “poller” flow (short runs, no long waits)
- If you can’t get a callback, don’t sleep for minutes. Instead, run quick “check” flow runs that reschedule themselves and exit when not ready.
- The worker slot is only used for a brief check each time.
Submit + poll:
from datetime import datetime, timedelta
from prefect import flow
from prefect.deployments import run_deployment
def submit_job_to_api() -> str:
...
def check_ready(job_id: str) -> bool:
...
@flow
def submit_flow():
job_id = submit_job_to_api()
# Kick off the first check soon
run_deployment(
name="poll-or-process",
parameters={"job_id": job_id},
scheduled_time=datetime.utcnow() + timedelta(seconds=5),
)
@flow
def poll_or_process(job_id: str):
if check_ready(job_id):
process_flow(job_id) # see below
else:
# Re-schedule a short, non-blocking check and exit
run_deployment(
name="poll-or-process",
parameters={"job_id": job_id},
scheduled_time=datetime.utcnow() + timedelta(seconds=15),
)
Parallelize the CPU-bound step
- Put the heavy work in a dedicated processing flow with a process pool.
- Run that flow in a separate work pool whose concurrency roughly matches your CPU cores, so you don’t overload the machine even if you have many “waiting” jobs.
from prefect import flow, task
from prefect.task_runners import ProcessPoolTaskRunner
@task
def heavy_compute(chunk):
...
def fetch_results(job_id: str):
...
@flow(task_runner=ProcessPoolTaskRunner(max_workers=8)) # tune to core count
def process_flow(job_id: str):
data = fetch_results(job_id)
chunks = chunkify(data)
futures = [heavy_compute.submit(c) for c in chunks]
results = [f.result() for f in futures]
return results
Why these work
- No long-running sleeps. Worker slots are used only for brief submits or checks, so your 10-slot pool can juggle hundreds of “in-flight” jobs.
- The processing stage is isolated and tuned for CPU, either by its own work pool or via ProcessPoolTaskRunner(max_workers=...)
.
- If your API can provide an ETA, schedule the next check closer to the expected ready time to reduce polling.
Links for reference:
- Work pools and workers
- Global concurrency limits (optional, to protect CPUs if needed)
Quick follow-ups:
- Can your external system call a webhook? If yes, Option A is simplest and most responsive.Marvin
09/07/2025, 7:47 AMmax_workers
value and work-pool concurrency for the processing step.roshan Kumar
09/07/2025, 7:53 AMMarvin
09/07/2025, 7:53 AMMarvin
09/07/2025, 7:54 AMimport asyncio
from prefect import flow, task
from prefect.task_runners import ProcessPoolTaskRunner
# --- I/O stage: async submission + readiness check ---
async def submit_async(item) -> str:
# call external API asynchronously; return job_id
...
async def wait_until_ready_async(job_id: str):
# efficient async polling/backoff until ready
...
@task
async def submit_and_wait(item):
job_id = await submit_async(item)
await wait_until_ready_async(job_id)
return job_id
# --- CPU stage: parallelize with processes ---
@task
def heavy_compute(chunk):
...
def chunkify(result): ...
@flow(task_runner=ProcessPoolTaskRunner(max_workers=8)) # tune to cores
async def batch_flow(items: list):
# 1) Fan out many API jobs concurrently in ONE flow run
ready_job_ids = await asyncio.gather(*(submit_and_wait.fn(i) for i in items))
# 2) Fetch + process each result; CPU step uses multiple processes
results = []
for jid in ready_job_ids:
data = ... # fetch result for jid (can be async too)
chunks = chunkify(data)
futures = [heavy_compute.submit(c) for c in chunks]
results.extend([f.result() for f in futures])
return results
Other ways to raise throughput without suspend/resume:
- Increase work pool concurrency and/or add more workers so you can run more than 10 flow runs concurrently. This does not reduce waiting cost, it just scales out capacity.
- Move the waiting out of Prefect entirely (external orchestrator + callback triggers the processing flow) — you said this has issues for your use case, so mentioning for completeness.
Key takeaway
- Async helps you pack more I/O into a single running flow, but it does not free the worker slot. If you must keep “one input per flow run” and you won’t use suspend/resume or self-rescheduling, the only way to run more than 10 simultaneous runs is to raise the pool’s concurrency or add workers.
- If batching is viable, async batching + process-pool CPU parallelism in a single run is the best way to dramatically increase throughput without changing your work-pool concurrency.
Would batching multiple inputs per flow run be acceptable for your workload? If so, how many items could you group per run, and how many cores are on your worker hosts? I can help you tune max_workers
and the batching pattern. For reference: Work pools and workers.roshan Kumar
09/07/2025, 8:29 AMMarvin
09/07/2025, 8:29 AMMarvin
09/07/2025, 8:30 AMfrom datetime import datetime, timedelta
from prefect import flow
from prefect.deployments import run_deployment
def submit_job_to_api() -> str: ...
def check_ready(job_id: str) -> bool: ...
def fetch_result(job_id: str): ...
def process(result): ...
@flow
def submit_flow():
job_id = submit_job_to_api()
run_deployment(
name="poll-or-process",
parameters={"job_id": job_id},
scheduled_time=datetime.utcnow() + timedelta(seconds=5),
)
@flow
def poll_or_process(job_id: str):
if check_ready(job_id):
result = fetch_result(job_id)
process(result)
else:
run_deployment(
name="poll-or-process",
parameters={"job_id": job_id},
scheduled_time=datetime.utcnow() + timedelta(seconds=15),
)
If you must keep “one input per flow run” and you won’t use suspend/resume:
- Async won’t free the worker slot. It can help only if you batch many items into a single flow run and await many API calls concurrently inside that single run.
- Otherwise, to run more than 10 simultaneous “waiting” flows, you’d need to increase work-pool concurrency or add workers, which scales cost rather than eliminating idle time.
References:
- Interactive (pause/suspend) flows
- Work pools and workers
If you can share what specifically makes suspend/resume a non-starter (e.g., infra, security, operational constraints), I can suggest a variant that fits those constraints (e.g., timeout-only deferral, event/webhook triggers, or mixed polling with backoff).