<@ULVA73B9P> I have a flow which does two tasks 1...
# ask-marvin
r
@Marvin I have a flow which does two tasks 1. api call and waits for 10 mins synchronously 2. Process the results which is cpu bound operation I have the worker concurrency limit set to 10 and each worker is of type process How can I make the api call unblocking and worker process much more flows than 10, since during api call the worker process just keeps waiting mostly
m
thought for 414.4 seconds
thought for 1009.7 seconds
Great question — you’re running into the classic “sleeping flow holds a worker slot” problem. Context (assuming Prefect 3.x): a worker’s concurrency limit controls how many flow runs it can execute at once. If your flow call “waits 10 minutes” (sleep/poll), that flow run continues to occupy one of those slots even though it’s idle. To make the wait non-blocking, restructure the flow so the long wait does not hold an active run. Here are two proven patterns: 1) Pause/suspend the flow, then resume later (frees the worker) - After you submit the API job, suspend the flow so the worker/infrastructure can exit. - Resume the flow when it’s time to continue (e.g., after 10 minutes or when the external system is ready). - In Prefect Cloud, you can use an Automation to resume the flow after a delay or on an event/webhook. Example:
Copy code
from prefect import flow, get_run_logger
from prefect.flow_runs import suspend_flow_run, resume_flow_run

def submit_job() -> str:
    # call external API and return a job_id
    ...

def fetch_result(job_id: str):
    ...

def process(result):
    ...

@flow
def my_flow():
    logger = get_run_logger()
    job_id = submit_job()
    <http://logger.info|logger.info>(f"Submitted job {job_id}; suspending for later resume")
    # This sets the flow to a suspended/paused state and frees the worker
    suspend_flow_run(timeout=600)  # e.g., plan to resume within 10 minutes

    # When resumed, continue here
    result = fetch_result(job_id)
    process(result)
Ways to resume: - Manually from the UI, or via API using
resume_flow_run(flow_run_id, run_input=...)
- Preferably via an Automation (time-based or event-based) in Prefect Cloud Links: - Interactive (pause/suspend) flows - Flow run resume API - Work pools and workers 2) Split into two flows (schedule the second and exit the first) - Flow A: submit the API job, then schedule Flow B 10 minutes in the future and exit immediately. - Flow B: fetch results and do the CPU-bound processing. - This completely frees the worker during the “wait”. Example:
Copy code
from datetime import datetime, timedelta
from prefect import flow
from prefect.deployments import run_deployment

def submit_job() -> str:
    ...

@flow
def submit_flow():
    job_id = submit_job()
    # Schedule a follow-up run 10 minutes later
    run_deployment(
        name="process/results",  # "<deployment-name>" of your processing flow
        parameters={"job_id": job_id},
        scheduled_time=datetime.utcnow() + timedelta(minutes=10),
    )
    # Done — frees the worker now

@flow
def process_flow(job_id: str):
    result = fetch_result(job_id)
    process(result)
Optimizing the CPU-bound part - Inside the processing flow, use a process-based task runner for parallel CPU work:
Copy code
from prefect import flow, task
from prefect.task_runners import ProcessPoolTaskRunner

@task
def heavy_compute(chunk):
    ...

@flow(task_runner=ProcessPoolTaskRunner())
def process_flow(job_id: str):
    data = fetch_result(job_id)
    # fan-out work across processes
    results = [heavy_compute.submit(c) for c in data.chunks()]
What this gives you - During the 10-minute wait, no worker slot is occupied (either because the flow is suspended or because the first flow has exited after scheduling the follow-up). - Your worker can process far more than 10 “in-flight” jobs, since it’s not blocked by waits. - You can route the CPU-bound processing to a separate work pool/worker with concurrency tuned to your machine’s cores, while keeping a different pool for submission/scheduling. Notes - Async/await for the API call can make it non-blocking within a single flow run, but the flow run still occupies a worker slot. To truly free worker capacity, use suspend/pause or the “split flows” pattern.
- You can also scale horizontally by adding more workers to the same work pool; just be careful to limit concurrency for the CPU-bound pool to avoid overloading the machine. A couple of clarifying questions to tailor the recommendation: - Is the “10 minutes” a fixed delay, or could you resume as soon as an external callback/webhook arrives? - Are you running in Prefect Cloud (so we can use Automations to resume), or OSS? - How parallelizable is the CPU-bound step (and how many cores are on the worker hosts)? Happy to help with concrete deployment and automation setup once I know the above.
r
@Marvin 10 mins is not a fixed delay, it could be upto 10 mins depending on the input, it could even be 1 second if input is small No we have self hosted Cpu bound is parallelizable
m
thought for 73.1 seconds
Got it — since the wait can be anywhere from 1s to ~10m and you’re self-hosting, the best approach is event-driven suspension/resume so the wait does not hold a worker slot. Then, run the CPU work in parallel with a process pool. Here are two patterns that work well in Prefect 3.x: 1) Event-driven suspend/resume (frees the worker immediately) - Submit the external job and include your current
flow_run_id
in the job’s callback URL. - Call
suspend_flow_run(wait_for_input=...)
to put the flow in Suspended state and release the worker. - When your external system finishes, your webhook handler calls
send_input(...)
(or
resume_flow_run(...)
) with the
flow_run_id
to immediately resume the flow and continue. Flow:
Copy code
from pydantic import BaseModel
from prefect import flow, get_run_logger
from prefect.flow_runs import suspend_flow_run
from prefect.runtime import flow_run

# Typed input you expect when resuming
class JobReady(BaseModel):
    payload_url: str

def submit_job(callback_url: str) -> str:
    # Submit to external system, include callback_url; return job_id
    ...

def fetch_result(url: str):
    ...

def process(result):
    ...

@flow
def my_flow():
    logger = get_run_logger()
    fr_id = flow_run.id()
    callback_url = f"<https://my-service/callback?flow_run_id={fr_id}>"
    job_id = submit_job(callback_url=callback_url)
    <http://logger.info|logger.info>(f"Submitted job {job_id}; suspending until callback")

    # Free the worker until input arrives; set timeout=None for indefinite wait
    ready: JobReady = suspend_flow_run(wait_for_input=JobReady, timeout=None)

    # Resumes here when your webhook sends input
    result = fetch_result(ready.payload_url)
    process(result)
Webhook handler (runs in your API service):
Copy code
from prefect.input import send_input

def handle_callback(flow_run_id: str, payload_url: str):
    # This unblocks the suspended flow immediately
    send_input(
        run_input={"payload_url": payload_url},
        flow_run_id=flow_run_id,
    )
Notes: - Get the current run id inside the flow via
prefect.runtime.flow_run.id()
. - Use a secret or signed token in the callback to prevent spoofing. - You can set
timeout
on
suspend_flow_run
if you want a safety net (e.g., mark as failed/alert if input never arrives). Links: - Interactive (pause/suspend) flows - Flow run resume API - Work pools and workers 2) Split “submit” and “process” into separate flows (small polling windows) If you can’t get a callback, run many short, non-blocking “check” runs: - Flow A submits the job and schedules Flow B to run soon. - Flow B checks readiness; if not ready, it quickly reschedules itself a bit later and exits (no long wait). When ready, it fetches and processes. This uses brief runs instead of holding a slot for minutes, allowing your worker to cycle through more runs. Parallelize the CPU work Use a process pool to fan out CPU-bound tasks:
Copy code
from prefect import flow, task
from prefect.task_runners import ProcessPoolTaskRunner

@task
def heavy_compute(chunk):
    ...

@flow(task_runner=ProcessPoolTaskRunner(max_workers=8))  # tune to your cores
def process_result(result):
    chunks = chunk_result(result)
    futs = [heavy_compute.submit(c) for c in chunks]
    return [f.result() for f in futs]
Why this helps your concurrency - Suspended/paused flows do not occupy a worker slot, so your worker can accept far more than 10 “in-flight” jobs even with a concurrency limit of 10. - Use one work pool/worker for submission (high concurrency, lightweight) and another for processing (concurrency ≈ CPU cores) to avoid CPU contention. If you share: - whether you can add a webhook/callback from the external system, - your worker machine core counts,
I can help you wire up the exact suspend/resume flow and tune the
ProcessPoolTaskRunner(max_workers=...)
.
Great question. Assuming you’re on Prefect 3.x: the cleanest way to avoid tying up a worker slot during a long external wait is to defer the flow run during the wait, then resume it later. That way, your 10-slot work pool isn’t “occupied” by runs that are just waiting. What to do 1) Use suspend_flow_run around the long wait (recommended) -
suspend_flow_run
transitions the run into a suspended state and frees the worker slot. - After the timeout (or an explicit resume), the run is rescheduled and picked up again by a worker to continue. - This allows your worker to process far more than 10 “in-flight” jobs because the waiting runs won’t consume concurrency slots. Example:
Copy code
from prefect import flow, get_run_logger
from prefect.flow import suspend_flow_run

def submit_job_to_api() -> str:
    # Your API submission that returns a job_id
    ...

def fetch_results(job_id: str):
    # Your API call to fetch/process results
    ...

def process_cpu_bound(payload):
    # Your CPU-bound processing
    ...

@flow
def my_flow():
    logger = get_run_logger()
    job_id = submit_job_to_api()
    <http://logger.info|logger.info>(f"Submitted job {job_id}; deferring for 10 minutes (frees worker slot)")
    
    # Defer this run for 10 minutes; it will not occupy a worker while waiting
    suspend_flow_run(timeout=600, key=f"wait-for-{job_id}")

    # After timeout or explicit resume, the run is picked back up here
    payload = fetch_results(job_id)
    process_cpu_bound(payload)
- Docs: suspend_flow_run API Advanced: If your external system can notify you when ready, you can resume earlier than the timeout using the API/UI and optionally pass input. (The run will consume a worker slot again only when it resumes.) 2) If you must poll until ready, use repeated deferrals - Instead of
time.sleep
, loop with small
suspend_flow_run
timeouts to check status. Each suspend frees the worker between polls.
Copy code
while True:
    if check_status(job_id) == "ready":
        break
    suspend_flow_run(timeout=60, key=f"poll-{job_id}")
3) Consider splitting into two flows (optional architecture) - Flow A: submit the job and exit (or suspend until ready). - Flow B: do CPU-bound processing when the job is ready (triggered by an automation or an explicit resume/`run_deployment`). - This lets you place CPU-heavy work into a separate work pool with its own (smaller) concurrency while allowing many more “waiting” jobs to exist without consuming slots. Other notes - Simply making the API call “async” won’t help with work-pool concurrency. An async
await asyncio.sleep(600)
still occupies a flow-run slot. Deferral via
suspend_flow_run
is what frees the slot. - If you truly need more actively running flows (not just waiting), increase the work pool concurrency or add more workers. But for long waits, deferral scales better. - You can also add global concurrency limits for the CPU-bound step if you need to protect resources: Global concurrency limits Would you prefer a time-based resume (e.g., after 10 minutes) or can your external system call back to resume the flow when the job is ready? I can share a snippet for resuming via the Prefect API if that helps.
r
@Marvin The suspend and resume does not look promising at first glance, any other way?
m
thought for 80.4 seconds
Totally fair. Two solid alternatives that don’t use suspend/resume: Option A: Webhook-triggered processing (no waiting inside Prefect) - Submit the API job and exit immediately (worker slot is freed). - When the external system finishes, your webhook calls Prefect to start the processing flow with the
job_id
. - This gives you near-zero latency when the job is ready (1 second or 10 minutes), and your work-pool concurrency isn’t consumed while waiting. Flow to submit:
Copy code
from prefect import flow, get_run_logger

def submit_job_to_api() -> str:
    # submit to external system and return job_id
    ...

@flow
def submit_flow():
    logger = get_run_logger()
    job_id = submit_job_to_api()
    <http://logger.info|logger.info>(f"Submitted external job: {job_id}")
    # done — worker slot is immediately free
Webhook handler (your API service):
Copy code
from prefect.deployments import run_deployment

def handle_external_callback(job_id: str):
    # Trigger process flow immediately when the job is ready
    run_deployment(
        name="process/results",   # your processing flow's deployment name
        parameters={"job_id": job_id},
    )
- Make sure the service that runs
handle_external_callback
has
PREFECT_API_URL
set to your self-hosted Prefect API. - You can also use the REST API if you prefer HTTP over Python. Option B: Self-rescheduling “poller” flow (short runs, no long waits) - If you can’t get a callback, don’t sleep for minutes. Instead, run quick “check” flow runs that reschedule themselves and exit when not ready. - The worker slot is only used for a brief check each time. Submit + poll:
Copy code
from datetime import datetime, timedelta
from prefect import flow
from prefect.deployments import run_deployment

def submit_job_to_api() -> str:
    ...

def check_ready(job_id: str) -> bool:
    ...

@flow
def submit_flow():
    job_id = submit_job_to_api()
    # Kick off the first check soon
    run_deployment(
        name="poll-or-process",
        parameters={"job_id": job_id},
        scheduled_time=datetime.utcnow() + timedelta(seconds=5),
    )

@flow
def poll_or_process(job_id: str):
    if check_ready(job_id):
        process_flow(job_id)   # see below
    else:
        # Re-schedule a short, non-blocking check and exit
        run_deployment(
            name="poll-or-process",
            parameters={"job_id": job_id},
            scheduled_time=datetime.utcnow() + timedelta(seconds=15),
        )
Parallelize the CPU-bound step - Put the heavy work in a dedicated processing flow with a process pool. - Run that flow in a separate work pool whose concurrency roughly matches your CPU cores, so you don’t overload the machine even if you have many “waiting” jobs.
Copy code
from prefect import flow, task
from prefect.task_runners import ProcessPoolTaskRunner

@task
def heavy_compute(chunk):
    ...

def fetch_results(job_id: str):
    ...

@flow(task_runner=ProcessPoolTaskRunner(max_workers=8))  # tune to core count
def process_flow(job_id: str):
    data = fetch_results(job_id)
    chunks = chunkify(data)
    futures = [heavy_compute.submit(c) for c in chunks]
    results = [f.result() for f in futures]
    return results
Why these work - No long-running sleeps. Worker slots are used only for brief submits or checks, so your 10-slot pool can juggle hundreds of “in-flight” jobs. - The processing stage is isolated and tuned for CPU, either by its own work pool or via
ProcessPoolTaskRunner(max_workers=...)
. - If your API can provide an ETA, schedule the next check closer to the expected ready time to reduce polling. Links for reference: - Work pools and workers - Global concurrency limits (optional, to protect CPUs if needed) Quick follow-ups: - Can your external system call a webhook? If yes, Option A is simplest and most responsive.
- How many cores do your worker hosts have? I can suggest a
max_workers
value and work-pool concurrency for the processing step.
r
@Marvin These solutions have some problems for our use case, can making the submit task async help here
m
thought for 49.4 seconds
Short answer: making the submit step async will not let a worker process more than your work pool’s concurrency limit. A flow run occupies a worker slot while it’s running, even if it’s “awaiting” an async operation. So async won’t free the slot or increase the number of simultaneous flow runs beyond 10. Where async can help: - If you can batch multiple inputs into a single flow run, you can use async to submit and await many API jobs concurrently within that one run. That way, one worker slot handles many “in-flight” jobs. Then, once ready, run the CPU-bound processing in parallel (process pool) within the same flow run. Sketch:
Copy code
import asyncio
from prefect import flow, task
from prefect.task_runners import ProcessPoolTaskRunner

# --- I/O stage: async submission + readiness check ---
async def submit_async(item) -> str:
    # call external API asynchronously; return job_id
    ...

async def wait_until_ready_async(job_id: str):
    # efficient async polling/backoff until ready
    ...

@task
async def submit_and_wait(item):
    job_id = await submit_async(item)
    await wait_until_ready_async(job_id)
    return job_id

# --- CPU stage: parallelize with processes ---
@task
def heavy_compute(chunk):
    ...

def chunkify(result): ...

@flow(task_runner=ProcessPoolTaskRunner(max_workers=8))  # tune to cores
async def batch_flow(items: list):
    # 1) Fan out many API jobs concurrently in ONE flow run
    ready_job_ids = await asyncio.gather(*(submit_and_wait.fn(i) for i in items))

    # 2) Fetch + process each result; CPU step uses multiple processes
    results = []
    for jid in ready_job_ids:
        data = ...  # fetch result for jid (can be async too)
        chunks = chunkify(data)
        futures = [heavy_compute.submit(c) for c in chunks]
        results.extend([f.result() for f in futures])

    return results
Other ways to raise throughput without suspend/resume: - Increase work pool concurrency and/or add more workers so you can run more than 10 flow runs concurrently. This does not reduce waiting cost, it just scales out capacity. - Move the waiting out of Prefect entirely (external orchestrator + callback triggers the processing flow) — you said this has issues for your use case, so mentioning for completeness. Key takeaway - Async helps you pack more I/O into a single running flow, but it does not free the worker slot. If you must keep “one input per flow run” and you won’t use suspend/resume or self-rescheduling, the only way to run more than 10 simultaneous runs is to raise the pool’s concurrency or add workers. - If batching is viable, async batching + process-pool CPU parallelism in a single run is the best way to dramatically increase throughput without changing your work-pool concurrency. Would batching multiple inputs per flow run be acceptable for your workload? If so, how many items could you group per run, and how many cores are on your worker hosts? I can help you tune
max_workers
and the batching pattern. For reference: Work pools and workers.
r
@Marvin I heard there is some concept of deferral task in prefect
m
thought for 40.7 seconds
Great question — “deferral” in Prefect typically refers to deferring a flow run’s execution, not a single task. In Prefect 3.x: - There isn’t a server-orchestrated “defer this task” primitive. Tasks run in the flow process; they can’t be individually put to sleep on the server and resumed later. - Deferral is done at the flow level (pause/suspend), or by rescheduling new flow runs. Those are the two ways to avoid holding a worker slot during a long wait. If suspend/resume doesn’t fit, the closest “deferral” pattern without it is self-rescheduling: - Have your flow exit quickly when the job isn’t ready and schedule itself to check again later. Each check is a short run that doesn’t block a slot for minutes. Example:
Copy code
from datetime import datetime, timedelta
from prefect import flow
from prefect.deployments import run_deployment

def submit_job_to_api() -> str: ...
def check_ready(job_id: str) -> bool: ...
def fetch_result(job_id: str): ...
def process(result): ...

@flow
def submit_flow():
    job_id = submit_job_to_api()
    run_deployment(
        name="poll-or-process",
        parameters={"job_id": job_id},
        scheduled_time=datetime.utcnow() + timedelta(seconds=5),
    )

@flow
def poll_or_process(job_id: str):
    if check_ready(job_id):
        result = fetch_result(job_id)
        process(result)
    else:
        run_deployment(
            name="poll-or-process",
            parameters={"job_id": job_id},
            scheduled_time=datetime.utcnow() + timedelta(seconds=15),
        )
If you must keep “one input per flow run” and you won’t use suspend/resume: - Async won’t free the worker slot. It can help only if you batch many items into a single flow run and await many API calls concurrently inside that single run. - Otherwise, to run more than 10 simultaneous “waiting” flows, you’d need to increase work-pool concurrency or add workers, which scales cost rather than eliminating idle time. References: - Interactive (pause/suspend) flows - Work pools and workers If you can share what specifically makes suspend/resume a non-starter (e.g., infra, security, operational constraints), I can suggest a variant that fits those constraints (e.g., timeout-only deferral, event/webhook triggers, or mixed polling with backoff).