<@ULVA73B9P> I have some questions about prefect
# marvin-ai
d
@Marvin I have some questions about prefect
m
thought for 16.0 seconds
Absolutely—happy to help! What are you working on with Prefect? A few details that will help me give spot-on guidance: - Which Prefect version are you using (3.x or 2.x)? - Where are you running things (local, Docker, Kubernetes, Cloud vs. self-hosted)? - Any code snippets, deployment configs, or error messages you can share? If you’re just getting started or want references: - Docs home: Prefect documentation - Deployments in 3.x: use
flow.from_source(...).deploy(...)
instead of 2.x build commands Fire away with your questions!
d
Great, can you tell me about how Prefect runs - particular how it would be best to parallelise data flows @Marvin
m
thought for 451.3 seconds
Great question. Here’s a practical overview of how Prefect runs and the main ways to parallelize in Prefect 3.x. How Prefect runs (high-level) - Orchestration vs execution: Prefect Cloud/Server orchestrates runs and state; your code executes wherever your worker runs (local machine, Docker, Kubernetes, etc.). - Flow run process: A single flow run executes in one process/container/pod. Inside that process, tasks can run concurrently via a task runner. - Task runners: Control intra-flow parallelism. Use threads or processes depending on workload. - Horizontal scale: Multiple flow runs can run in parallel across many workers and/or infrastructure. Parallelism options 1) Parallelize inside a single flow (tasks in parallel) - Best for: speeding up a single flow by running many tasks concurrently. - Pick a task runner at the flow level: - I/O-bound work: ThreadPoolTaskRunner - CPU-bound work: ProcessPoolTaskRunner (functions must be pickleable and avoid large global state) - Pattern: schedule tasks without awaiting each immediately, then gather results. Example (I/O-bound) with threads
Copy code
from prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner
from prefect import futures

@task
def fetch(i):
    # call API, read a file, DB query, etc.
    return i * 2

@flow(task_runner=ThreadPoolTaskRunner(max_workers=20))
def my_flow(n: int = 100):
    futs = [fetch(i) for i in range(n)]  # scheduling tasks creates PrefectFutures
    # optionally stream results as they complete
    for f in futures.as_completed(futs):
        _ = f.result()  # handle result
    # or wait for all at once
    results = [f.result() for f in futs]
    return results
CPU-bound variant with processes
Copy code
from prefect import flow, task
from prefect.task_runners import ProcessPoolTaskRunner

@task
def heavy_compute(i):
    # pure CPU work
    return i ** 2

@flow(task_runner=ProcessPoolTaskRunner(max_workers=8))
def my_cpu_flow(n: int = 1000):
    futs = [heavy_compute(i) for i in range(n)]
    return [f.result() for f in futs]
Notes - Max concurrency per flow is bounded by
max_workers
. - For ProcessPoolTaskRunner, tasks must be importable at module top-level and arguments/results must be pickleable. 2) Parallelize across flow runs (horizontal scale) - Best for: running many independent flow runs (e.g., per-partition, per-customer, per-day) in parallel across machines/containers. - How: - Create a deployment for your flow and a work pool; run one or more workers that pull work. - Launch multiple flow runs (via schedule, automation, or programmatically). - Scale by running more workers and/or increasing work pool concurrency. Useful CLI we verified exists - Work pool concurrency (limit how many flow runs run at once): - Set limit:
prefect work-pool set-concurrency-limit <pool-name> <limit>
- Clear limit:
prefect work-pool clear-concurrency-limit <pool-name>
- Deploy flows:
prefect deploy --help
3) Throttle shared resources with central concurrency limits - Best for: preventing overload of downstreams (APIs, DBs) across all flows and workers. - Tag your tasks and set a concurrency limit for that tag. Prefect will enforce slots globally.
Copy code
from prefect import task

@task(tags=["db"])
def write_to_db(row):
    ...
Create a tag limit with the CLI - Create:
prefect concurrency-limit create db --limit 5
- Inspect:
prefect concurrency-limit inspect db
- List:
prefect concurrency-limit ls
- Reset/delete:
prefect concurrency-limit reset db
,
prefect concurrency-limit delete db
Optional: fine-grained in-code guards - Use the concurrency context manager inside a task to gate a critical section (useful when you can’t tag the whole task):
Copy code
from prefect.concurrency.sync import concurrency

@task
def call_api_safe(i):
    with concurrency("external-api", occupy=1, timeout_seconds=60):
        # only N of these sections run at once (based on the configured limit)
        ...
- There’s also a rate limiter if you need requests-per-second control. Choosing an approach - Single big flow and you just want it faster: use ThreadPoolTaskRunner or ProcessPoolTaskRunner. - Many independent units of work: create a deployment and run many flow runs in parallel; scale workers/infrastructure. - Protect shared services: use tag-based concurrency limits and/or in-code concurrency guards. Quick links - Prefect docs: Prefect documentation - Deployments overview: Deployments and infrastructure - Concurrency and limits: Concurrency limits - Task runners: Task runners A couple of clarifying questions to tailor this to you: - Is your workload primarily I/O-bound (APIs, DB, files) or CPU-bound (heavy computation)? - Do you want to parallelize within a single flow, run many flow runs in parallel, or both? - Where will you run this (local, Docker, Kubernetes, Cloud vs. self-hosted)?
d
Can Prefect itself parallelise tasks within a flow across multiple containers / machines? @Marvin
m
thought for 173.8 seconds
Short answer: by itself, a single Prefect flow run executes in one process/container. Tasks within that flow can be parallelized locally (threads/processes). To run tasks across multiple containers/machines within the same flow run, use a distributed task runner integration like Dask or Ray. What this looks like in Prefect 3.x - Local concurrency (single machine): - ThreadPoolTaskRunner: great for I/O-bound tasks - ProcessPoolTaskRunner: great for CPU-bound tasks (multiple processes on the same host) - Distributed across machines/containers: - DaskTaskRunner (from prefect-dask): submit tasks to a Dask cluster (workers can be pods/containers/VMs) - RayTaskRunner (from prefect-ray): submit tasks to a Ray cluster Key point: your flow run still starts in one worker/container, but that process becomes a client of Dask/Ray and offloads tasks to the cluster workers running elsewhere. Dask example
Copy code
# pip install "prefect-dask dask[distributed]"
from prefect import flow, task
from prefect_dask.task_runners import DaskTaskRunner

@task
def do_work(i): 
    return i * 2

# connect to an existing Dask scheduler (e.g., K8s, ECS, VM)
@flow(task_runner=DaskTaskRunner(address="<tcp://dask-scheduler:8786>"))
def dist_flow(n: int = 100):
    futs = [do_work(i) for i in range(n)]
    return [f.result() for f in futs]
Ray example
Copy code
# pip install "prefect-ray ray"
from prefect import flow, task
from prefect_ray.task_runners import RayTaskRunner
from prefect_ray.context import remote_options

@task
def do_work(i):
    return i * 2

# connect to Ray head (e.g., ray://<head-svc>:10001) or start local Ray if address=None
@flow(task_runner=RayTaskRunner(address="<ray://ray-head:10001>"))
def dist_flow(n: int = 100):
    with remote_options(num_cpus=1):  # optional per-task resources
        futs = [do_work(i) for i in range(n)]
    return [f.result() for f in futs]
Considerations - Code and environment: cluster workers must have the same code and Python deps (use a common container image for Dask/Ray workers). - Serialization: task args/returns must be serializable; keep payloads small. Use shared storage for large data and pass references. - Networking: your Prefect worker/container must reach the Dask scheduler or Ray head address. - Orchestration features still apply: retries, tagging, and Prefect’s global concurrency limits will gate task starts even on distributed runners. - Subflows: distributed runners dispatch tasks, not flows. Subflows still execute in the main flow process unless you run them as separate flow runs. Docs - Dask integration: Prefect Dask - Ray integration: Prefect Ray - Task runners overview: Task runners - Concurrency limits (useful when distributing): Concurrency limits If you share where you’re running (Docker, Kubernetes, etc.) and whether you already have Dask or Ray available, I can suggest a concrete setup (e.g., Dask on Kubernetes vs. Ray on Kubernetes, addressing, and container images).
d
Do workers poll a database of the work pool directly or do they poll a server? @Marvin
m
thought for 39.6 seconds
In Prefect 3.x, workers poll the Prefect API server (Cloud or your self-hosted Prefect Server), not the database directly. How it works at a glance: - The worker authenticates to the API server and polls its work pool via API endpoints for scheduled flow runs. - When it claims a run, the server updates its database; the worker never talks to the DB itself. - The worker sends heartbeats, logs, and state transitions back to the API server throughout the run. Useful references: - Worker health/heartbeats: Worker health checks - Architecture overview of work pools/workers: Work pools and workers (blog)
d
How does the prefect server scale? @Marvin
m
thought for 229.2 seconds
Great question. In Prefect 3.x (self-hosted “Prefect Server”), scaling is driven by a stateless API layer and a stateful Postgres database. What scales and how - API and UI (stateless) - Horizontally scalable: run multiple API/UI replicas behind a load balancer. - Add replicas to handle more worker polling, log ingestion, and UI/API traffic. - The API process can run “services” (scheduler, late-run handler, etc.). You can: - Run services on all replicas (they coordinate via the database so duplicate work is avoided), or - Designate one replica for services and start others with “no services” for pure API handling. - Database (stateful) - Postgres is the source of truth for runs, states, logs, artifacts, etc. - Scale Postgres separately (bigger instance, more IOPS, connection poolers like pgbouncer, read replicas for analytics). - Most large-scale bottlenecks are DB-related; index and vacuum heavy tables (logs, states), and consider partitioning if volume is high. Operational guidance - For API/UI: - Run 2–3+ replicas behind a load balancer; autoscale on CPU/memory or request rate. - Tune webserver workers and keep-alive timeouts for your traffic profile. - For services: - Either run with services enabled on each replica (simple) or dedicate one “services” pod/instance and disable services elsewhere. - Services are idempotent and coordinate through the DB. - For Postgres: - Use managed Postgres if possible; ensure sufficient RAM/IOPS. - Add pgbouncer if you have many workers or API replicas to smooth connection spikes. - Monitor table growth (especially logs and states). Partitioning high-churn tables can help at large scale. - For workload patterns: - Many workers mean more API polling—scale API replicas accordingly. - Heavy logs/event volume increases DB write pressure—consider lowering log level, sampling, or externalizing heavy data to object storage and storing references. Notes - Prefect Cloud: you don’t manage the control-plane scaling; just scale your workers/infrastructure. The guidance above applies to self-hosting. - No Redis is required; state is persisted in Postgres. Helpful links - Server overview: Prefect Server concepts - Self-hosting guide: Self-hosted Prefect - Postgres scaling/partitioning considerations: Database partitioning for Postgres If you’re self-hosting, are you planning to deploy on Kubernetes, VMs, or Docker Compose? I can suggest a concrete topology (replica counts, services strategy, and DB sizing) for your setup.
d
How do I specify how many flows I run on a worker? Can this scale automaticly based on memory usage? @Marvin
m
thought for 293.3 seconds
Short answer - Per-worker: set a concurrency limit on the worker so it only executes N flow runs at a time. - Pool-wide: set a concurrency limit on the work pool to cap total concurrent flow runs across all workers in that pool. - Autoscaling on memory: not built into Prefect. Use your infrastructure (e.g., Kubernetes HPA) or an external controller to adjust worker replicas or pool concurrency based on memory. How to set it - Per worker - Programmatic (example with ProcessWorker):
Copy code
from prefect.workers.process import ProcessWorker

    worker = ProcessWorker(
        work_pool_name="my-pool",
        limit=2,  # max concurrent flow runs on this worker
    )
    worker.start()
- CLI: the worker start command accepts a per-worker concurrency limit (check
prefect worker start --help
in your environment). It maps to the same
limit
you see in the worker classes. - Per work pool - Cap concurrency across all workers in the pool:
Copy code
prefect work-pool set-concurrency-limit "my-pool" 10
This prevents more than 10 flow runs from starting at once, regardless of how many workers are running. Link: Work pools Notes by worker type - Process worker: runs flows locally in subprocesses; the per-worker
limit
is the number of simultaneous subprocesses. - Docker/Kubernetes/ECS workers: launch remote jobs (containers/pods/tasks). The per-worker
limit
controls how many flow runs the worker will launch/manage at once; memory is consumed in the launched jobs, not by the worker process itself. Autoscaling based on memory - Not natively handled by Prefect. Recommended approaches: - Kubernetes: use an HPA to scale worker replicas on CPU/memory, and set a work-pool concurrency limit to cap global throughput. Also assign resource requests/limits to flow-run pods in your job template so the scheduler enforces memory per run. - Other platforms (ECS/VMs): use your platform’s autoscaling on service/container memory/CPU; adjust worker counts and optionally change work-pool concurrency via automation. - If you want “memory-aware” concurrency, run a small controller that observes memory usage and updates the work pool’s concurrency with:
Copy code
prefect work-pool set-concurrency-limit "my-pool" <new-limit>
or via the Prefect API. Related docs - Work pools and workers: Work pools - Global concurrency limits: Global concurrency limits - Kubernetes integration: Prefect Kubernetes If you share which worker type you’re using (process, docker, kubernetes, ECS) and where you run it, I can give a concrete configuration and autoscaling pattern.
d
OK nice. If a flow run is calling an API that is rate limited at a global level, such that I might be waiting a long time to get a token, what should I do in Prefect-world to prevent that worker running and taking up concurrency when it is just idle? @Marvin
m
thought for 158.8 seconds
You’ve got a few good options to avoid “burning” a worker slot when an API is globally rate-limited and you may be waiting a long time. Option A: Suspend the flow run while you wait (frees the worker slot) - Use suspend_flow_run from inside your flow when you detect you can’t proceed (e.g., no token available or global rate gate reached). This ends the running infrastructure process and returns the run to a suspended state; the worker slot is freed. Later, you can resume it automatically or manually. - Pattern: 1) Try to acquire permission/token. 2) If not available, suspend with a timeout or wait-for-input. 3) Resume when capacity exists (via Automation, webhook, or another process). Example
Copy code
from prefect import flow, get_run_logger
from prefect.flow_runs import suspend_flow_run, resume_flow_run  # resume usually triggered externally

def token_available() -> bool:
    # your global token gate check
    return False

@flow
def api_flow():
    log = get_run_logger()
    if not token_available():
        <http://log.info|log.info>("Capacity not available — suspending run to free worker.")
        suspend_flow_run(timeout=3600)  # stop infra; worker slot freed
        # execution will resume here when resumed
    <http://log.info|log.info>("Proceeding with API call...")
    # call API now
How to resume - Automation: Create an Automation that resumes suspended runs when your “capacity available” condition is met (e.g., webhook, time window, or custom signal). - Manual: Use the UI or
prefect flow-run resume <id>
. Option B: Use global concurrency/rate limits to gate starts - Tag the tasks/flows that hit the API and set a global concurrency limit so only N run at once across your whole workspace. Prefect will queue runs and won’t start more than N—preventing extra runs from starting at all. - This avoids starting runs that would just wait, but if your flow starts and then hits a long wait mid-run, Option A is better. CLI for limits (verified) - Create tag limit:
prefect concurrency-limit create external-api --limit 2
- Inspect:
prefect concurrency-limit inspect external-api
- List:
prefect concurrency-limit ls
Tag your task
Copy code
from prefect import task

@task(tags=["external-api"])
def call_api():
    ...
Option C: Explicit concurrency guard + suspend - If you need to start the run (e.g., parameterized schedule) but want to immediately release the slot when no capacity: - Attempt to acquire capacity (your own token service or Prefect concurrency context). - If not acquired promptly, call
suspend_flow_run()
and exit. Notes and trade-offs - Suspend vs Pause: suspend_flow_run ends the infra/worker process and truly frees the slot; pause_flow_run generally keeps the run “waiting” but is better for short waits. For long or unpredictable waits, use suspend. - Resumption strategy is key: pair suspension with an Automation or small controller that resumes when your token service says “go.” - If you can predict a window (e.g., “after 5 minutes”), consider scheduling resumption with an Automation instead of constant polling. Related links - Concurrency limits: Concurrency limits - Rate limits (concept): Rate limits - Flow suspension/resume reference: Server concepts (and
prefect.flow_runs.suspend_flow_run
in API) If you share how you determine “token available” (own service, third-party API headers, etc.), I can sketch the Automation to resume runs and the minimal code needed to wire this up.
d
Right, what's the recommended way to implement rate limiting for external services across multiple flow runs without limiting the number of flows I run at one time @Marvin
m
thought for 86.3 seconds
Goal: allow many flow runs to start, but centrally throttle calls to a specific external API across all of them. Recommended pattern in Prefect 3.x - Use global/tag-based limits for concurrency and optional rate limits for requests-per-second behavior. This controls only the API call throughput, not how many flows run overall. Two building blocks 1) Tag-based concurrency limit (max concurrent calls) - Tag tasks that call the API and set a global limit for that tag. Tasks exceeding the limit won’t start Running until a slot is free. Your other tasks and flows continue unaffected.
Copy code
# Set once (CLI)
prefect concurrency-limit create external-api 3  # at most 3 concurrent calls

# In code
from prefect import task

@task(tags=["external-api"])
def call_external_api(payload):
    ...
2) Global rate limit (tokens per second/minute) - For true request rate control, create a global concurrency limit with slot decay (token bucket), or use the rate_limit context to pace calls. - CLI for token-bucket style limit (verified):
Copy code
prefect global-concurrency-limit create external-api-rps --limit 10 --slot-decay-per-second 10
# Allows ~10 tokens per second across all tasks that consume this limit name
- Consume tokens in code when making a call:
Copy code
from prefect.concurrency.sync import rate_limit

def call_with_rate_limit(...):
    with rate_limit("external-api-rps", occupy=1, timeout_seconds=30):
        # make exactly one request here
        ...
- Combine with the tag-based limit to bound bursts and smooth sustained rate: - Tag enforces “at most N simultaneous requests” - rate_limit enforces “no more than R requests per second” Putting it together - Simple, concurrency-only: - Use only the task tag +
prefect concurrency-limit create external-api N
- Concurrency + RPS: - Tag the task with “external-api” to cap simultaneous calls - Wrap each request in
rate_limit("external-api-rps", occupy=1)
for RPS control - This does not cap how many flow runs start; it just gates the API call sections. Avoid holding worker slots when waiting - If a flow reaches the API section and must wait a long time (e.g., external token window), call
suspend_flow_run()
before the API step and resume later when your window opens. This frees the worker slot entirely while preserving flow progress. Handy links - Tag-based limits: Concurrency limits - Global limits (token bucket, slot decay): Global concurrency limits - Rate limits concept: Rate limits If you share your target policy (e.g., “max 5 concurrent, 20 req/s sustained, burst 50”), I can suggest concrete CLI values and code snippets.
d
suspend_flow_run seems like a nice idea, but what actually happens if I have, say, 100k flows suspended - where are they all stored and how are they reactivated? @Marvin
m
thought for 129.0 seconds
Great question—suspension is meant to be lightweight at scale. What happens on suspend - State change only: suspend_flow_run transitions the run to a Suspended state in the Prefect API/DB. No infrastructure is kept running; the worker/pod/container for that run exits. This frees the worker slot and any job resources. - Persistence: the run record and its state are stored in the Prefect database (Postgres for self-hosted, Prefect’s control plane in Cloud). You can have 100k+ suspended runs; they’re just rows plus state history/metadata. Where they “live” - In the DB: flow_run + flow_run_state rows (and related metadata like run inputs if you used wait_for_input). There’s no per-run process or in-memory hold. How they’re reactivated - Explicit resume: call resume_flow_run(flow_run_id) via code, CLI, or API. - CLI example:
prefect flow-run resume <flow_run_id>
- Automations: set an Automation that listens for an external signal or time window and calls “Resume Flow Run” when your condition is met (e.g., your token service posts a webhook, or a cron window opens). - Timeout behavior: if you used a timeout with suspend_flow_run, you can detect expiry via Automations and either resume, cancel, or notify. Scaling considerations for 100k suspended runs - DB hygiene: this is primarily a database scaling question. - Use a managed Postgres with sufficient storage/IOPS. - Index on flow_run.state_type, updated timestamps, etc., to list/filter Suspended efficiently. - Prune old state history/logs if retention policies allow. - For very large installations, consider table partitioning for high-churn tables (logs, states). - Resume fan-out control: when it’s time to resume many runs, throttle resumption to avoid a sudden thundering herd. - Use work-pool concurrency limits to cap inflight resumes. - Batch resumes in waves (Automations or a small controller service) and/or stagger with jitter. - Observability: emit events/metrics on suspends/resumes so you can monitor queue depth and resume rates. When to prefer suspend vs. gating starts - Long, unpredictable waits: suspend_flow_run is ideal—no resources held. - Short waits or simple throughput control: rely on tag/global concurrency limits or rate limits to keep runs queued or throttled before they begin the API call section. If you share your expected suspended-run volume and resume burst profile (e.g., “resume up to 500/min”), I can suggest concrete indexes, Automation patterns, and resume batching code.
d
Maybe a queue would be a better way to manage this - is there a way I can have a global queue for tasks waiting to get through an API? @Marvin
m
thought for 79.3 seconds
Short answer: Prefect doesn’t provide a separate FIFO message-queue primitive for API calls, but you can get a “global queue” behavior two ways: 1) Built-in global gates (simple, no extra infra) - Use a named global limit as the gate and let tasks wait for a slot/token. - Concurrency cap (max in-flight requests): tag the task and set a concurrency limit for that tag.
Copy code
prefect concurrency-limit create external-api 5
Then tag your task:
Copy code
from prefect import task
    @task(tags=["external-api"])
    def call_api(...):
        ...
Tasks beyond 5 will queue in AwaitingConcurrencySlot; flows continue to run but will be blocked at that task. - Token-bucket rate limit (req/s) with global-concurrency-limit:
Copy code
prefect global-concurrency-limit create external-api-rps --limit 10 --slot-decay-per-second 10
In code, consume tokens:
Copy code
from prefect.concurrency.sync import rate_limit

    def call_with_rps(...):
        with rate_limit("external-api-rps", occupy=1, timeout_seconds=2):
            # make exactly one request here
            ...
- Caveat: this “queue” is orchestration-level; the flow run stays active while waiting for a slot. If you need to free the worker/infrastructure, combine with suspension (below). 2) Free the worker while queued (decoupled, still centralized) - Try-acquire-then-suspend pattern: - Attempt to acquire a token/slot quickly; if it times out, suspend the flow so it doesn’t hold a worker slot. Resume later via an Automation or webhook.
Copy code
from prefect import flow, get_run_logger
  from prefect.concurrency.sync import rate_limit
  from prefect.flow_runs import suspend_flow_run

  @flow
  def my_flow(payload):
      log = get_run_logger()
      try:
          with rate_limit("external-api-rps", occupy=1, timeout_seconds=0.25):
              return do_request(payload)
      except Exception:
          <http://log.info|log.info>("No token available; suspending to free resources.")
          suspend_flow_run(timeout=900)
          # On resume, try again:
          with rate_limit("external-api-rps", occupy=1, timeout_seconds=2):
              return do_request(payload)
- Resume strategy: use a scheduled Automation (resume N/s) or a small controller that resumes suspended runs when tokens are available to avoid a thundering herd. 3) Strict FIFO/durable queue (when ordering/durability matters) - Use an external queue (SQS, Pub/Sub, Redis) as the single “API gateway” queue. - Main flows enqueue requests and optionally suspend. - A dedicated Prefect deployment consumes the queue at controlled concurrency/RPS (using the same Prefect limits), and notifies or resumes requestors on completion. - This gives you real FIFO and decouples waiting entirely from the main flows. Which to pick - Need simple global throttle and don’t mind flows idling: use tag-based/global limits. - Need to release worker resources during long waits: try-acquire-then-suspend + resume. - Need strict FIFO and durable queuing: external message queue + a dedicated consumer deployment. Docs - Concurrency limits: Concurrency limits - Global limits (token bucket): Global concurrency limits - Rate limits concept: Rate limits If you share your exact policy (e.g., “burst 20, sustained 5 r/s” and whether you must free workers), I can sketch a concrete config (limits + Automations) or an SQS-based pattern.
d
Tell me more about the automations to resume paused flows. @Marvin
m
thought for 945.9 seconds
Automations are Prefect’s event-driven rules: “When X happens, do Y.” They’re perfect for resuming flows you’ve paused/suspended to free worker slots. What you can trigger on - Flow run events: e.g., “Flow run entered Suspended state”, “Flow run is older than N minutes”, tagged runs, specific flow/deployment. - Custom events: from your systems (e.g., “API capacity available”). You can emit a custom event and have an Automation react to it. - Composite/sequence triggers: combine multiple conditions. Common patterns to resume paused/suspended flows 1) Auto-resume on external capacity signal - Your rate-limit or token service posts a webhook or emits a custom event when capacity is available. - An Automation with an Event Trigger (matching your event) runs the “Resume Flow Run” action targeting runs you want to drain (by tag, flow, deployment, etc.). - Emit a custom event from anywhere:
Copy code
from prefect.events.utilities import emit_event

emit_event(
    event="external.api.capacity.available",
    resource={"prefect.resource.id": "global/external-api"},  # your identifier
    payload={"slots": 5}  # optional, can be used by automations for filtering
)
2) Window-based drain - Create an Automation that runs on a schedule or when flows have been Suspended for > X minutes. Action: “Resume Flow Run”. Useful for retry windows or quiet periods. 3) Try-acquire-then-suspend loop - In your flow, attempt to acquire a rate-limit token quickly. If unavailable,
suspend_flow_run(timeout=...)
to free infra. - An Automation resumes runs when capacity returns.
Copy code
from prefect import flow, get_run_logger
from prefect.concurrency.sync import rate_limit
from prefect.flow_runs import suspend_flow_run

@flow
def api_flow():
    log = get_run_logger()
    try:
        with rate_limit("external-api-rps", occupy=1, timeout_seconds=0.2):
            return make_request()
    except Exception:
        <http://log.info|log.info>("No token; suspending to free worker")
        suspend_flow_run(timeout=900)  # resume later via Automation
        with rate_limit("external-api-rps", occupy=1, timeout_seconds=2):
            return make_request()
How resumption works - The “Resume Flow Run” action transitions the run out of Suspended. Workers will pick it up and the flow continues exactly after the suspension point. - If you used
suspend_flow_run(wait_for_input=...)
, a programmatic resume can pass `run_input`:
Copy code
from prefect.flow_runs import resume_flow_run
resume_flow_run(flow_run_id, run_input={"token": "..."} )
- In the UI, manual resume can include input; Automations can also pass input derived from event payloads. Targeting and avoiding thundering herds - Filter the Automation to only resume runs with specific tags (e.g.,
external-api
), flows, or deployments. - Combine with work pool concurrency limits to cap how many resumes turn into active runs at once. - If you need strict “resume N per minute,” use a small controller (cron or webhook handler) that resumes in batches using the API. Setup in the UI (typical) - Automations → New Automation - Trigger: “When a flow run enters state = Suspended” (optionally AND “has tag = external-api”) - Action: “Resume Flow Run” (optionally add a delay or additional actions like Send Notification) - Or Trigger: “When event = external.api.capacity.available” and Action: “Resume Flow Run” with filters to target relevant runs Links and references - Blog overview of Automations and events: Automations and observability - Emitting custom events from code: see
prefect.events.utilities.emit_event
- Resuming flows programmatically: see
prefect.flow_runs.resume_flow_run
If you describe how your “capacity available” signal is produced (webhook, queue depth, token service), I can sketch a concrete Automation config and, if needed, a minimal controller to resume in batches.
d
@Marvin All I want is to suspend the run and pop it in a queue. Otherwise other flows could just hog capacity and it never gets completed.
m
thought for 126.9 seconds
Totally reasonable goal. Prefect doesn’t have a first-class FIFO message queue, but you can achieve “suspend and put it in a fair global queue” cleanly with one of these patterns: Option 1: Pure Prefect queue with suspension + controlled draining - What you get: suspended runs live in the Prefect DB; no worker slots are held; a small controller or Automation drains them in order. - How: 1) In your flow, when you hit the rate gate, call
suspend_flow_run(timeout=...)
. Tag the run (e.g.,
external-api
) to identify it. 2) Create a “drain” process: - Either an Automation that triggers on a periodic schedule or an external controller (tiny script/deployment). - It lists Suspended runs with your tag, sorts FIFO (by created/updated time), and resumes a bounded batch each tick to avoid thundering herds. - Work pool concurrency limits ensure only N resume into running at once. - Minimal controller sketch:
Copy code
from prefect.client.orchestration import get_client
from prefect.flow_runs import resume_flow_run
import asyncio

BATCH = 20  # resume up to 20 per tick
TAG = "external-api"

async def drain_once():
    async with get_client() as client:
        # Query suspended runs with the tag; adjust filters as needed
        runs = await client.read_flow_runs(
            flow_run_filter={
                "states": {"type": {"any_": ["SUSPENDED"]}},
                "tags": {"all_": [TAG]},
            },
            limit=BATCH,
            sort="CREATED"  # or "EXPECTED_START_TIME"/"UPDATED"
        )
        for fr in runs:
            await resume_flow_run(fr.id)

if __name__ == "__main__":
    asyncio.run(drain_once())
- Run it on a schedule (cron, Cloud Run, K8s CronJob) or behind a webhook. Option 2: Event-driven resume (still FIFO-ish) - Emit a custom event when capacity tokens become available. An Automation listens and resumes up to N suspended runs each time. - You can chain Automations or a small service to ensure you don’t resume more than your allowed rate/concurrency. Option 3: True FIFO with an external queue - If strict ordering and durability matter, put requests into SQS/Pub/Sub/Redis. - Your flows enqueue and suspend. A dedicated Prefect consumer deployment drains the queue at your desired rate/concurrency and resumes or notifies when done. - This separates fairness strictly from flow scheduling and gives you battle-tested queue semantics. Keep other flows from hogging capacity - Use a work-pool concurrency limit for the consumer pool to cap how many resumes become active at once. - Combine with tag-based limits or rate_limit tokens on the actual API call to enforce throughput. - The controller/Automation enforces fairness by draining FIFO and in bounded batches. Why this works - Suspended runs are just DB rows; they don’t consume worker resources. - Your “queue” is the set of Suspended runs filtered by a tag; the controller gives you ordering and fairness. - Resuming in batches + pool concurrency prevents stampedes while ensuring steady progress. If you tell me your target policy (e.g., FIFO, resume 50/minute, max 5 in-flight), and where you run (Cloud/self-hosted, K8s/etc.), I’ll tailor the exact Automation settings, filters, and a production-ready drain script.
d
@Marvin So could the automation just check every, say 500ms? Can it check for the number of currently available tokens? Can it be triggered when something is added to the suspend queue? What is the overhead for suspending and restarting a flow?
m
thought for 67.1 seconds
Great questions—here are practical answers to each. - Can an Automation check every 500ms? - Automations are event-driven and support interval/scheduled triggers, but sub-second polling is not what they’re designed for. If you truly need 500ms cadence, run a tiny controller service (a daemon/cron job) that uses the Prefect API to drain the queue; keep Automations for event-driven or second-level cadence. - Can an Automation check how many tokens are available? - Not directly in the Automation condition language. You can: - Query global concurrency limits via API/CLI to compute available tokens (limit - active_slots), then resume that many runs. - Inspect example:
prefect global-concurrency-limit inspect external-api-rps
- Or have your token service emit a custom event with the available count; Automations can react to that event and call a webhook or run a small controller. - Can it be triggered when something is added to the suspend queue? - Yes. You can trigger on “flow run entered Suspended state” (and filter by tag/flow/deployment). Typically, you’ll use that signal to wake a controller or increment a metric, then let the controller drain fairly at your chosen rate to avoid stampedes. - Overhead for suspending and restarting a flow - Suspension is a state transition persisted in Prefect’s DB; the running infrastructure stops. Overhead is small (DB write + API work). - Resumption schedules the run again. Actual latency/overhead is dominated by your infrastructure: - Process worker: typically sub-second to a couple of seconds to re-exec the flow process. - Docker/Kubernetes/ECS: container/pod/task startup (seconds to tens of seconds). - Pickup latency also depends on worker polling config. You can lower worker prefetch/poll intervals for faster pickup, but keep them reasonable to avoid heavy API load. Recommended pattern for a fair, global queue - Suspend when you can’t get a token, then drain FIFO in bounded batches based on available tokens. Example controller (runs every 500ms if you like):
Copy code
import asyncio
from prefect.client.orchestration import get_client
from prefect.flow_runs import resume_flow_run

TAG = "external-api"
GCL_NAME = "external-api-rps"
MAX_RESUMES_PER_TICK = 10
SLEEP_SECONDS = 0.5  # 500ms

async def available_tokens(client, gcl_name):
    gcls = await client.read_global_concurrency_limits(name=gcl_name)
    if not gcls:
        return 0
    gcl = gcls[0]
    # available = limit - active_slots (rounded for safety)
    return max(0, int(gcl.limit - (gcl.active_slots or 0)))

async def drain_once():
    async with get_client() as client:
        tokens = await available_tokens(client, GCL_NAME)
        budget = min(tokens, MAX_RESUMES_PER_TICK)
        if budget <= 0:
            return

        runs = await client.read_flow_runs(
            flow_run_filter={
                "states": {"type": {"any_": ["SUSPENDED"]}},
                "tags": {"all_": [TAG]},
            },
            limit=budget,
            sort="CREATED"  # FIFO-ish
        )
        for fr in runs:
            await resume_flow_run(fr.id)

async def main():
    while True:
        try:
            await drain_once()
        finally:
            await asyncio.sleep(SLEEP_SECONDS)

if __name__ == "__main__":
    asyncio.run(main())
Key controls - Use a tag (e.g.,
external-api
) on flows/tasks that should join the queue. - Use a global concurrency limit or token-bucket GCL for your API (e.g.,
external-api-rps
) to represent capacity. - Work-pool concurrency limits cap how many resumes can become active at once, preventing other flows from hogging capacity. Helpful references - Concurrency limits: Concurrency limits - Global limits (token bucket): Global concurrency limits
- API client (for controllers): API client guide - Automations overview: Automations and observability If you share your target policy (e.g., FIFO, drain up to 25 runs per second, cap 5 in-flight) and where you’re running (K8s, VMs), I can tailor the controller and any Automation filters to your setup.
d
Can automations be run locally too? @Marvin
m
thought for 32.9 seconds
Short answer: yes—Automations run wherever your Prefect API server runs. That can be Prefect Cloud or your own self-hosted/server instance (which can be “local” on your laptop or in your cluster). Key points - Automations are a server feature, not tied to workers. If you run Prefect Server yourself, the automation engine evaluates triggers and executes actions there. - Local/self-hosted: start the server and keep services enabled; Automations will fire based on your triggers. - Start server:
prefect server start
- Don’t use
--no-services
if you want background services (scheduler, etc.) active. - Cloud: Automations run in Prefect Cloud’s control plane automatically. - No-server mode: there’s no “standalone automation” process without a server. For sub-second control or custom logic, run a small controller script that uses the Prefect API; that can run locally anywhere. When to use which - If you already use Prefect Cloud: configure Automations in the UI and you’re done. - If you self-host: run Prefect Server (Docker/K8s/VM or even on your laptop) and define Automations there. - If you need 500ms cadence or custom fairness: a tiny controller service that calls the API (resume in batches, check available tokens) complements or replaces Automations for that specific loop. References - Server concepts and self-hosting: Prefect Server concepts and Self-hosted Prefect - Automations overview (blog): Automations and observability Are you on Prefect Cloud or planning to run Prefect Server locally/on your own infra? I can outline the exact setup for your case.
d
@Marvin - how about a task that uses a rate limited API triggering a check for suspended flows when that task is done?
m
thought for 75.5 seconds
Yes—that’s a solid pattern. You can have any task that finishes an API call “poke” a centralized drain to resume suspended runs. Two good ways to implement it: Option A: Trigger a drain controller deployment from the task - At the end of your rate-limited task, kick off a small “drain” flow as a separate deployment that resumes a bounded batch of suspended runs. Guard it with a global mutex so only one drain runs at a time. In your API task:
Copy code
from prefect import task
from prefect.deployments.flow_runs import run_deployment

@task
def call_external_api(...):
    # ... do the API work ...
    # fire-and-forget a drain run (as a separate flow run)
    run_deployment(
        name="drain-controller/drain",
        parameters={"budget": 10},   # how many to resume per tick
        as_subflow=False
    )
Drain flow (sketch):
Copy code
from prefect import flow, get_run_logger
from prefect.concurrency.sync import concurrency
from prefect.client.orchestration import get_client
from prefect.flow_runs import resume_flow_run
import asyncio

TAG = "external-api"           # tag you applied to queued runs
GCL_NAME = "external-api-rps"  # token-bucket global limit name

@flow
def drain(budget: int = 10):
    log = get_run_logger()
    # distributed mutex to prevent multiple drains running concurrently
    with concurrency("resume-controller", occupy=1, timeout_seconds=0):
        asyncio.run(_drain_async(budget))
        <http://log.info|log.info>("Drain complete")

async def _drain_async(budget: int):
    async with get_client() as client:
        # Optionally compute available tokens from a global limit
        gcls = await client.read_global_concurrency_limits(name=GCL_NAME)
        if gcls:
            available = max(0, int(gcls[0].limit - (gcls[0].active_slots or 0)))
            budget = min(budget, available) if available else 0
        if budget <= 0:
            return

        # FIFO-ish: fetch the oldest suspended runs for our tag
        runs = await client.read_flow_runs(
            flow_run_filter={
                "states": {"type": {"any_": ["SUSPENDED"]}},
                "tags": {"all_": [TAG]},
            },
            limit=budget,
            sort="CREATED",
        )
        for fr in runs:
            await resume_flow_run(fr.id)
Why this works - Your task stays simple; the drain logic is centralized, fair, and capped. - The global mutex (
concurrency("resume-controller")
with a limit of 1) avoids multiple drains racing when many tasks finish at once. - You can tie the “budget” to available tokens from a global limit or keep it fixed. Option B: Emit an event, let an Automation run the drain - At the end of the API task, emit a custom event (e.g., “external.api.capacity.released”). - An Automation listens for that event and runs the “drain-controller” deployment. This decouples your task from orchestration specifics. In your task:
Copy code
from prefect import task
from prefect.events.utilities import emit_event

@task
def call_external_api(...):
    # ... do API work ...
    emit_event(
        event="external.api.capacity.released",
        resource={"prefect.resource.id": "global/external-api"},
        payload={"hint": "ok"},  # optional
    )
Automation - Trigger: event = external.api.capacity.released - Action: Run Deployment (drain-controller/drain) with parameters Notes and guardrails - Keep the drain batch small and frequent rather than large and rare to avoid stampedes. - Combine with work-pool concurrency limits to cap how many resumes become active at once. - If you use a token-bucket global limit (slot decay), the “available tokens” check lets you resume only what the system can sustainably process. Docs - Global concurrency limits (token bucket): Global concurrency limits - Concurrency locks in code: Concurrency limits
- API client patterns: Prefect API client If you share your target throughput (e.g., “resume up to 20/s, max 5 in-flight”) and where you’re running (Cloud, K8s, etc.), I can tailor the drain flow and Automation config.
d
@Marvin and just to double check, does Prefect have queues to start workflows?
m
thought for 54.5 seconds
Yes—Prefect has “work pools” and optional “work queues” that workers poll to start flow runs. How it works - Work pool: top-level routing target for a deployment; workers attach to a pool and pull eligible flow runs. - Work queue: an optional sub-queue within a pool that further filters/slices runs. Workers can subscribe to one or more queues for finer routing. - Workers poll the Prefect API (not the DB) for queued/scheduled runs in their pool/queues and start them on your infrastructure. Manage queues (CLI we verified) - Create/list/inspect/pause/resume/preview: -
prefect work-queue create ...
-
prefect work-queue ls
-
prefect work-queue inspect <id>
-
prefect work-queue pause <id>
/
prefect work-queue resume <id>
-
prefect work-queue preview <id>
- Per-queue concurrency limit: -
prefect work-queue set-concurrency-limit <id> <limit>
-
prefect work-queue clear-concurrency-limit <id>
Important notes - These are orchestration queues, not a general-purpose FIFO like SQS. Prefect schedules/prioritizes runs (typically by scheduled time/priority) and stores state durably in its database. - A deployment targets a work pool (and optionally a work queue); workers started for that pool/queue will pick up runs. - Use pool/queue concurrency to control how many flow runs start at once; use task/tag/global limits to throttle shared resources within runs. Docs - Work pools and routing overview: Work pools - CLI reference (queues): run
prefect work-queue --help
for the full command set in your environment. If you tell me your routing goals (e.g., split by environment, priority tiers), I can suggest a pool/queue layout and how to target them from deployments.
d
@Marvin Would you recommend using a new prefect flow to process each row if I have millions of rows, but each row requires passing through a pipeline?
m
thought for 42.8 seconds
Short answer: I would not create a separate flow run per row if you have millions of rows. That’s a lot of orchestration overhead and will stress the API/DB, slow startup, and clutter observability. Instead, batch and parallelize within flows, or partition into a manageable number of flow runs. Recommended patterns - Single flow, task-level parallelism (best for up to tens/hundreds of thousands of items) - Read rows, chunk into batches, and submit tasks for each row in a batch using a task runner. - Use ThreadPoolTaskRunner (I/O) or ProcessPoolTaskRunner (CPU). For very large/concurrent workloads, use DaskTaskRunner or RayTaskRunner to spread across a cluster. - Control throughput with tag-based concurrency limits and rate limits to protect external services. Example (batch + parallel tasks)
Copy code
from prefect import flow, task, futures
from prefect.task_runners import ThreadPoolTaskRunner

@task(retries=3, retry_delay_seconds=5, persist_result=False)
def process_row(row):
    # call APIs, transform, write out
    return row["id"]

def chunk(iterable, size):
    buf = []
    for x in iterable:
        buf.append(x)
        if len(buf) == size:
            yield buf
            buf = []
    if buf:
        yield buf

@flow(task_runner=ThreadPoolTaskRunner(max_workers=64))
def pipeline(rows, batch_size: int = 5_000):
    for batch in chunk(rows, batch_size):
        futs = [process_row(r) for r in batch]
        for f in futures.as_completed(futs):
            _ = f.result()  # handle/log result
- Partitioned flows (when total items are huge) - Create a deployment and launch many flow runs, each responsible for a partition (e.g., per file/day/shard/customer) of, say, 10k–100k rows. Inside each run, use task-level parallelism as above. - This keeps orchestration objects to thousands of flow runs, not millions. - Streaming/consumer pattern (when rows arrive continuously) - Use an external queue (SQS/Pub/Sub/Kafka) and run a consumer flow/deployment that pulls messages and processes with controlled concurrency. This decouples producers from processing and avoids spawning a flow run per row. Why avoid a flow-per-row - Startup/pickup latency per flow run adds up. - Massive DB pressure (run/state/log rows). - Harder to observe and manage millions of runs. - Cost/throughput inefficiency vs. batching. Throughput and fairness controls - Protect shared services: tag the API task and set a concurrency limit, and/or use a global rate limit (token bucket). - For long waits (rate limits, tokens): try-acquire then suspend the flow run and resume later so you don’t hold worker slots. - Use work-pool concurrency limits to cap how many flow runs execute at once; inside each flow, use task runners to parallelize efficiently. When would per-row flows make sense? - If each row must be a fully isolated, independently retryable, auditable unit with distinct SLAs and you’re only dealing with thousands (not millions). Even then, consider “micro-batching” (e.g., 100–1,000 rows per flow run) to control overhead. Helpful links - Task runners: Task runners - Dask integration: Prefect Dask - Ray integration: Prefect Ray - Concurrency limits: Concurrency limits If you can share: - Approx rows, per-row cost (I/O vs CPU), and required throughput - Whether you have a Dask/Ray cluster - Any strict per-row audit/traceability needs …I can suggest concrete batch sizes, runner choice, and concurrency settings.
d
When running a multiple flows on a single worker how big is the overhead per flow? @Marvin
m
thought for 170.7 seconds
Short answer: there isn’t a fixed “per-flow overhead” number from Prefect. It depends on your worker type (Subprocess vs Docker vs Kubernetes), your Python environment, and what your flow imports. Prefect itself is lightweight; most memory is from the Python interpreter and your libraries. Typical ranges I see in practice for a trivial flow: - Subprocess worker: each flow run is its own Python process. Baseline RSS is often in the ~50–150 MB range per flow (Python + Prefect + your imports), with negligible idle CPU (<1%) beyond heartbeats/logging. - Docker worker: similar to subprocess for the process inside the container; add a few MB of container/runtime bookkeeping. Node-wide Docker daemon overhead is shared across containers. - Kubernetes worker: similar to Docker per pod. If you add sidecars, init containers, or service meshes, those can add 50–200+ MB per pod depending on what’s running. Other overhead to keep in mind: - The worker process itself (separate from flows) typically uses tens of MB. - Heartbeat/logging network calls are small and periodic. - The biggest driver is almost always your imports and what the flow actually does, not Prefect’s orchestration. If you can share: - Which worker type you’re using (Subprocess, Docker, Kubernetes)? - Your Python version and any big libraries (pandas, PyTorch, etc.)? - Whether you’re running many flows concurrently? I can give a tighter estimate. In the meantime, here’s a quick way to measure in your exact environment (run this in the same image/env your worker uses):
Copy code
import os
import gc
import psutil
from prefect import flow, get_run_logger

def rss_mb():
    return psutil.Process(os.getpid()).memory_info().rss / (1024 ** 2)

@flow
def measure_baseline():
    logger = get_run_logger()
    gc.collect()
    <http://logger.info|logger.info>(f"RSS at flow start: {rss_mb():.1f} MB")
    # Import your typical libraries to see the true baseline
    # import pandas as pd; import numpy as np; ...
    gc.collect()
    <http://logger.info|logger.info>(f"RSS after imports: {rss_mb():.1f} MB")

if __name__ == "__main__":
    measure_baseline()
Run a few of these concurrently to see aggregate usage. On Docker/K8s you can also compare with
docker stats
or
kubectl top pod
. References: - Manage work pools and workers (Prefect 3.x) - Work pools and workers (blog overview) - Prefect 3 GA notes
d
@Marvin I am running prefect server in a docker container and want to run my flow on it. How do I do that?
m
thought for 232.3 seconds
Great question — assuming you’re on Prefect 3.x, here’s the quickest way to run your flow against your self-hosted server running in Docker. High-level steps 1) Point your CLI/worker to your server (set PREFECT_API_URL) 2) Create a work pool 3) Start a worker 4) Create a deployment for your flow 5) Trigger a run (from the UI or CLI) Step 1 — Point to your server If your server is exposed on localhost4200 - Bash/Zsh:
Copy code
export PREFECT_API_URL=<http://localhost:4200/api>
- PowerShell:
Copy code
$env:PREFECT_API_URL = "<http://localhost:4200/api>"
Note on Docker networking: - If your worker runs on your host machine, use http://localhost:4200/api. - If your worker runs in another Docker container, use the Docker service name or container hostname, e.g. http://prefect-server:4200/api. Docs: Manage Settings Step 2 — Create a work pool - Process worker (no containers, runs Python locally):
Copy code
prefect work-pool create -t process process-pool
- Docker worker (runs each flow in a container):
Copy code
prefect work-pool create -t docker docker-pool
Tip: For Docker worker, install extras:
pip install "prefect[docker]"
Step 3 — Start a worker - Process worker:
Copy code
prefect worker start -p process-pool
- Docker worker:
Copy code
prefect worker start -p docker-pool
Step 4 — Create a deployment for your flow Option A: Python API (nice for quick starts) Example flow file: flows/my_flow.py
Copy code
from prefect import flow

@flow
def my_flow(name: str = "world"):
    print(f"Hello, {name}!")

if __name__ == "__main__":
    my_flow()
Process worker deployment (local code):
Copy code
from prefect.flows import Flow

Flow.from_source(
    ".",                        # path to your repo or project root
    "flows/my_flow.py:my_flow"  # entrypoint: file_path:flow_name
).deploy(
    name="hello-process",
    work_pool_name="process-pool",
    parameters={"name": "Marvin"},
)
Docker worker deployment (containerized):
Copy code
from prefect.flows import Flow

Flow.from_source(
    ".",
    "flows/my_flow.py:my_flow"
).deploy(
    name="hello-docker",
    work_pool_name="docker-pool",
    image="your-docker-user/prefect-flows:latest",  # must be pullable by the worker
    build=True,   # build an image from your project
    push=True,    # push image to your registry
    parameters={"name": "Marvin"},
)
Option B: CLI via prefect.yaml Initialize once:
Copy code
prefect deploy init
Edit prefect.yaml to set your work pool (process-pool or docker-pool), entrypoint, and optionally an image/build stanza for Docker. Then:
Copy code
prefect deploy
Docs: - prefect.yaml deployments - Workers API reference - Blog overview: Work pools and workers Step 5 — Trigger a run - From the UI: Navigate to your deployment and click Run. - From the CLI:
Copy code
prefect deployment run "my_flow/hello-process"
# or
prefect deployment run "my_flow/hello-docker"
Notes and common gotchas - Ensure the worker environment has the same code and dependencies as your flow. For Process workers, the code must be available on disk; for Docker workers, the image must include your code and dependencies. - If your server is in Docker Compose and your worker is in another Compose service, set PREFECT_API_URL to the service name (e.g., http://server:4200/api). - If you see “No worker found…” or runs stay “Late,” your worker likely isn’t connected to the right work pool or cannot reach the API (PREFECT_API_URL). Would you like to run with a Process worker or a Docker worker? Also, where will your worker run (host machine or another container)? I can tailor the exact commands and a minimal prefect.yaml for your setup.