<@ULVA73B9P> For running a Work Pool that is Prefe...
# marvin-ai
n
@Marvin For running a Work Pool that is Prefect Managed, when calculating the cost/compute used is there a minimum run time that a flow costs?
m
thought for 148.1 seconds
Short answer: There isn’t a publicly documented “minimum billed duration” for Prefect-managed (serverless) work pools in Prefect Cloud 3.x. Billing for managed compute is consumption-based and accrues while your flow is actually running, but Prefect hasn’t published a specific rounding rule (e.g., per-second vs. 1-minute minimum) in the docs I can find. References: - Introducing Prefect Serverless - New self-serve plans and serverless credits - Work Pools and Workers overview If you need a definitive, contract-grade answer for your plan, I can check with our team or you can reach out via your Cloud workspace support channel. As a quick self-check, you can also run a trivial short-lived flow on a Prefect-managed pool and compare serverless credit consumption in your workspace’s usage view to confirm there isn’t a per-run minimum being applied. Are you on a specific plan (Hobby/Starter/Team/Enterprise)? If you share that, I can point you to the most relevant usage/credits details.
n
@Marvin I have a flow that runs a query on and returns a list of values, each one I want to pass into a deployment, what is the best way to execute this?
m
thought for 180.1 seconds
Great question. In Prefect 3.x, the usual pattern is to have an “orchestrator” flow that: 1) queries for the list of values, and 2) fans out by triggering your target deployment once per value, passing each value as a parameter. You can do this with
prefect.deployments.run_deployment
and choose whether to wait for the child runs (subflows) to finish or just fire-and-forget. Option A — Fan out and wait for all runs to finish (subflows) - Good when the parent flow should succeed/fail based on all children. - Uses
run_deployment(..., parameters=...)
. By wrapping in a task and submitting, you can get parallel fan-out.
Copy code
from prefect import flow, task
from prefect.deployments import run_deployment

@task
def trigger_one(value: str):
    # pass your parameter to the deployment
    return run_deployment(
        name="my-flow/my-deployment",
        parameters={"item": value},  # adjust to your deployment's parameter name(s)
        # as_subflow=True (default) keeps parent/child lineage
    )

@flow
def orchestrator():
    values = query_for_values()  # your query here -> returns a list
    futures = [trigger_one.submit(v) for v in values]  # fan-out
    results = [f.result() for f in futures]           # wait for all
    return results
Option B — Fire-and-forget (don’t wait for completion) - Good when you want the orchestrator to just enqueue runs and exit. - You can either: - call
run_deployment(..., as_subflow=False)
to just create runs and return immediately, or - use the Prefect client to create runs from the deployment (async), then move on.
Copy code
from prefect import flow, task
from prefect.deployments import run_deployment

@task
def trigger_one(value: str):
    # This will create the flow run and return without waiting
    fr = run_deployment(
        name="my-flow/my-deployment",
        parameters={"item": value},
        as_subflow=False
    )
    return fr.id

@flow
def orchestrator():
    values = query_for_values()
    ids = [trigger_one.submit(v).result() for v in values]
    return ids
Controlling parallelism - If you expect a large list and want to avoid overwhelming your workers, you can add a concurrency limit around the trigger task using Prefect’s concurrency controls, or simply batch the fan-out list. - Prefect’s concurrency tools: Concurrency concepts Helpful references - Deployments concept: Deployments in Prefect 3 - API (creating runs from a deployment): Create flow run from deployment A couple of quick clarifiers to tailor this: - Do you want the parent flow to wait for all child runs to complete, or just submit them and exit? - Roughly how many values are you expecting per run (so we can recommend a sane concurrency/ batching strategy)? - Are you routing these runs to a specific work pool or work queue that needs
job_variables
?
n
@Marvin often their will be no jobs to run though the maximum (though very rare) would be around 50, the job will run every half an hour to check the table, it does require job variables to be passed in
m
thought for 111.1 seconds
Got it. Given: - Often zero items, occasionally up to ~50 - Runs every 30 minutes to check a table - You need to pass job variables per run Recommendation: Use an “orchestrator” flow that queries the table and, only if there are items, fans out by triggering your target deployment once per item. Pass both parameters and job variables to each run. Because you don’t want the orchestrator to sit around, I’d make these fire-and-forget (as_subflow=False), and optionally submit in small batches to avoid a big burst. Example pattern
Copy code
from datetime import datetime
from prefect import flow, task
from prefect.deployments import run_deployment

# Replace with your deployment path "flow_name/deployment_name"
DEPLOYMENT_NAME = "my-flow/my-deployment"

def query_for_rows():
    # Return a list of dicts with both the param(s) and job variables you need
    # Example shape:
    # [
    #   {"item": "A", "job_vars": {"env": {"TENANT_ID": "abc"}}},
    #   {"item": "B", "job_vars": {"env": {"TENANT_ID": "xyz"}}},
    # ]
    return []

def batched(seq, size):
    for i in range(0, len(seq), size):
        yield seq[i:i+size]

@task
def trigger_one(item_value, job_vars, queue_name=None, window_id=None):
    # Use idempotency_key to protect against duplicate submits if the orchestrator retries
    idem = f"{DEPLOYMENT_NAME}:{item_value}:{window_id}" if window_id else f"{DEPLOYMENT_NAME}:{item_value}"
    fr = run_deployment(
        name=DEPLOYMENT_NAME,
        parameters={"item": item_value},       # adjust to your flow’s parameter(s)
        job_variables=job_vars,                # pass through row-specific job variables
        as_subflow=False,                      # fire-and-forget
        work_queue_name=queue_name,            # set if you route via a specific queue
        flow_run_name=f"process-{item_value}", # helpful for traceability
        idempotency_key=idem
    )
    return fr.id

@flow
def orchestrator(queue_name: str | None = None, batch_size: int = 10):
    rows = query_for_rows()

    if not rows:
        # nothing to do — exit quickly
        return []

    # Optional: identify the 30-min window to de-duplicate within this interval
    window_id = datetime.utcnow().strftime("%Y%m%d%H%M")  # or round to 30m if you prefer

    created_ids = []
    for chunk in batched(rows, batch_size):
        futs = [trigger_one.submit(r["item"], r["job_vars"], queue_name, window_id) for r in chunk]
        # .result() returns quickly because as_subflow=False only creates runs
        created_ids.extend(f.result() for f in futs)

    return created_ids
Notes - job_variables: The exact keys depend on your work pool’s base job template (Managed, Docker, Kubernetes, etc.). Open your work pool in the UI to see the available job variables and their names; pass that dict as
job_vars
per item. Example shapes often include
env
, image/config fields, and resource settings, but the keys vary by template. - Concurrency: With a rare max of ~50, simple batching (e.g., 10 at a time) is a safe default. You can tune
batch_size
to match your pool capacity. - Waiting vs. not waiting: The above is fire-and-forget. If you want the parent to wait on children, set
as_subflow=True
and call
f.result()
after submission; that will wait for subflow completion. - Routing: If you use a specific work queue, pass
work_queue_name
. Helpful docs - Deployments: Deployments in Prefect 3 - Job variables overview: Customize job variables - API (create flow run from deployment): Create flow run from deployment If you share which work pool type you’re using (Managed, Docker, K8s, etc.), I can sketch the exact
job_variables
structure to pass per item. Also, do you want the orchestrator to wait for children or just enqueue and exit?