<@ULVA73B9P> there are 1000 pending flow runs. I h...
# ask-marvin
a
@Marvin there are 1000 pending flow runs. I have to run those using the copy to new run. How to do this wi9thout manually selecting each one by one
m
thought for 478.6 seconds
Got it — there isn’t a bulk “Copy to new run” button in the UI, but you can do this safely via the API. Assuming Prefect 3.x, here’s a small script that: - finds all PENDING flow runs (optionally filter by deployment) - creates a new run for each using the same deployment, parameters, tags, and name - optionally cancels the original run (commented out) Save as
bulk_copy_runs.py
and run it from a machine that’s logged into your Prefect workspace (PREFECT_API_URL/API_KEY set or CLI profile in use).
Copy code
import asyncio
from prefect import get_client
from prefect.client.schemas.filters import (
    FlowRunFilter,
    FlowRunFilterState,
    FlowRunFilterStateType,
    DeploymentFilterName,
    DeploymentFilter,
)
from prefect.states import StateType
# from prefect.states import Cancelled  # uncomment if you want to cancel originals

# CONFIG
LIMIT = 1000            # how many to process
ONLY_DEPLOYMENT = None  # e.g. "my-flow/my-deployment" or leave None to copy all
NAME_SUFFIX = " (copy)" # suffix for new runs; set to "" to keep same name

async def main():
    async with get_client() as client:
        # Build filter for PENDING runs
        flow_run_filter = FlowRunFilter(
            state=FlowRunFilterState(
                type=FlowRunFilterStateType(any_=[StateType.PENDING])
            )
        )

        # Optional: restrict to a single deployment by "<flow>/<deployment>" name
        deployment_filter = None
        if ONLY_DEPLOYMENT:
            deployment_filter = DeploymentFilter(name=DeploymentFilterName(any_=[ONLY_DEPLOYMENT]))

        runs = await client.read_flow_runs(
            flow_run_filter=flow_run_filter,
            deployment_filter=deployment_filter,
            limit=LIMIT,
        )

        print(f"Found {len(runs)} pending runs")

        created = 0
        for r in runs:
            # Ensure we have full details (parameters, tags, etc.)
            run = await client.read_flow_run(r.id)

            new = await client.create_flow_run_from_deployment(
                deployment_id=run.deployment_id,
                parameters=run.parameters or {},
                tags=run.tags or [],
                name=(f"{run.name}{NAME_SUFFIX}") if run.name else None,
                # job_variables=run.job_variables or {},  # uncomment if you use job variables
                # scheduled_start_time=None,  # run immediately
            )

            print(f"Copied {run.id} -> {new.id}")
            created += 1

            # Optional: cancel the original run after successful copy
            # await client.set_flow_run_state(run.id, state=Cancelled())

        print(f"Created {created} new runs")

if __name__ == "__main__":
    asyncio.run(main())
How to run: - Make sure you’re authenticated (e.g.,
prefect cloud login
or set `PREFECT_API_URL`/`PREFECT_API_KEY`) - Run:
python bulk_copy_runs.py
Questions to confirm so I can tailor it: - Are all 1,000 pending runs for the same deployment, or multiple deployments? - Do you want the script to cancel the original pending runs after creating the copies? - Do you need to copy job variables as well? If so, I’ll enable that line.
a
@Marvin yes same deployment yes cancel run after creating copies no need for variables This is prefect 2.18 not prefect 3
m
thought for 174.7 seconds
Thanks — since you’re on Prefect 2.18, here’s a script that will: - find all PENDING runs for a single deployment - create a new run for each with the same parameters/tags/name - cancel the original runs after each copy is created Note: Make sure you have an agent/worker running so the new runs will execute.
Copy code
import asyncio
from prefect.client.orchestration import get_client
from prefect.client.schemas.filters import (
    FlowRunFilter,
    FlowRunFilterState,
    FlowRunFilterStateType,
    FlowRunFilterDeploymentId,
)
from prefect.states import StateType, Cancelled

# CONFIG
DEPLOYMENT_NAME = "your-flow-name/your-deployment-name"  # e.g., "etl/daily-load"
LIMIT = 1000
CONCURRENCY = 20        # adjust to control API load
NAME_SUFFIX = ""        # or " (copy)" if you want to mark the new runs

async def copy_and_cancel_one(client, run):
    # Get full details for parameters/tags/name
    run = await client.read_flow_run(run.id)

    # Create new run on the same deployment
    new = await client.create_flow_run_from_deployment(
        deployment_id=run.deployment_id,
        parameters=run.parameters or {},
        tags=run.tags or [],
        name=(f"{run.name}{NAME_SUFFIX}") if run.name else None,
    )
    print(f"Copied {run.id} -> {new.id}")

    # Cancel the original
    await client.set_flow_run_state(run.id, state=Cancelled())
    print(f"Cancelled original {run.id}")

async def main():
    async with get_client() as client:
        # Resolve deployment id by name: "<flow>/<deployment>"
        flow_name, deployment_name = DEPLOYMENT_NAME.split("/", 1)
        deployment = await client.read_deployment_by_name(flow_name=flow_name, name=deployment_name)
        if not deployment:
            raise RuntimeError(f"Deployment not found: {DEPLOYMENT_NAME}")

        # Find PENDING runs for this deployment
        fr_filter = FlowRunFilter(
            state=FlowRunFilterState(
                type=FlowRunFilterStateType(any_=[StateType.PENDING])
            ),
            deployment_id=FlowRunFilterDeploymentId(any_=[deployment.id]),
        )
        runs = await client.read_flow_runs(flow_run_filter=fr_filter, limit=LIMIT)
        print(f"Found {len(runs)} PENDING runs for {DEPLOYMENT_NAME}")

        # Process with limited concurrency
        sem = asyncio.Semaphore(CONCURRENCY)

        async def guarded(run):
            async with sem:
                try:
                    await copy_and_cancel_one(client, run)
                except Exception as e:
                    print(f"Error processing {run.id}: {e}")

        await asyncio.gather(*(guarded(r) for r in runs))

if __name__ == "__main__":
    asyncio.run(main())
How to run - Ensure your environment is authenticated to your Prefect 2 workspace (PREFECT_API_URL/API_KEY or
prefect cloud login
) - Set
DEPLOYMENT_NAME
to the exact "<flow>/<deployment>" name - Run:
python bulk_copy_runs.py
Optional tweaks - If your “pending” runs are stuck for multiple reasons, you can change the state filter to include SCHEDULED (or others) by adding
StateType.SCHEDULED
to the list. - Increase/decrease
CONCURRENCY
if you hit API rate limits. Side note: If/when you upgrade to Prefect 3, the same approach works with minor import changes, but there still isn’t a bulk “Copy to new run” button in the UI.
a
@Marvin The httpx connection pool is exhausting in the prefect 2 agent. How to increase that?
m
thought for 263.8 seconds
Short answer: there isn’t a supported knob in Prefect 2.x to increase the httpx connection pool size used by the agent/client. Prefect doesn’t expose httpx’s
Limits
(e.g.,
max_connections
,
max_keepalive_connections
) via settings or env vars in 2.x. What you can do instead - Reduce concurrent API load per process - Use work queue/pool concurrency limits in the UI to cap how many flow runs start at once. - If you’re on workers, cap worker concurrency so each worker submits fewer jobs at a time. - Run more (smaller) agents/workers instead of one very busy one to distribute connections across processes. - Update to the latest 2.x (or 3.x if you can). Older 2.x releases had a few client/connection handling fixes; upgrading can help prevent connection leaks under load. - Increase polling intervals or prefetch windows if you’ve customized them to be very aggressive (fewer, less-bursty API calls). - If you’re self-hosting, check any reverse proxy (nginx, ALB, etc.) keepalive and connection limits; too-low upstream limits can cascade into pool exhaustion. Helpful docs - Agents (Prefect 2.x) - Work pools and workers (Prefect 2.x) - Connecting to Prefect Cloud / networking notes If you can share: - Your exact Prefect version (e.g., 2.13.x) - Whether you’re using the legacy “agent” or newer “worker” model - The exact error text/stack trace (e.g.,
httpx.PoolTimeout
,
max_connections
exceeded) - Rough scale (number of flow runs starting concurrently) …I can suggest a more precise set of steps (and confirm if there’s a known bug fixed in a later version).
a
@Marvin its prefect 2.18. Its agent based. There is burst of incoming flows around 300 flows. The httpx connection pool is exhausted. Will this help PREFECT_API_REQUEST_TIMEOUT somehow?
m
thought for 68.1 seconds
Short answer: No —
PREFECT_API_REQUEST_TIMEOUT
only changes how long each request waits before timing out. It does not increase httpx’s connection pool size, so it won’t fix PoolTimeout/“connection pool exhausted” errors. In fact, a larger timeout can keep connections checked out longer and make pool exhaustion more likely during a burst. What will help for a 300-run burst with a 2.18 agent - Spread the load across processes - Run multiple agents (e.g., 3–5) against the same queue(s) so each process has its own httpx pool. - Alternatively, split flows across multiple work queues and run one agent per queue. - Cap concurrent starts to add backpressure - Set a concurrency limit on the work queue (or use flow-run concurrency limits via tags) so only N runs start at once and the rest queue. - This is the most reliable way to avoid client bursts that exhaust the pool. - Stagger scheduling to avoid top-of-minute spikes - Add offsets/jitter to schedule so 300 flows don’t try to start at the same second. - Keep current on 2.x or consider workers - You’re on 2.18 which is fairly recent, but ensure you’re on the latest 2.18 patch. - If/when you can, moving to work pools + workers gives you better controls around concurrency and scaling. Notes - Prefect 2.x does not expose httpx’s `max_connections`/`max_keepalive_connections` via env vars, so you can’t increase the pool size directly. - If you’re self-hosting the API, also verify upstream limits (reverse proxy keepalive, server workers), but the error you’re seeing originates in the agent’s client pool. Helpful references - Agents (Prefect 2.x) - Work queues (Prefect 2.x) - Concurrency limits (Prefect 2.x) If you can share: - The exact exception text (e.g.,
httpx.PoolTimeout
) - Whether you’re on Cloud or self-hosted - How many agents you’re running now and how they’re deployed …I can propose a concrete rollout (e.g., number of agents, a safe queue concurrency limit, and scheduling tweaks) tailored to your setup.
a
@Marvin Can i use dask to solve this somehow?
m
thought for 172.8 seconds
Short answer: Not directly. Dask won’t change the agent’s httpx connection pool. The pool exhaustion happens when the agent is submitting lots of flow runs to the API at once; Dask only affects parallelism inside a single flow run. Where Dask can help - If you can consolidate those ~300 “things” into fewer flow runs (ideally 1), then run the work as tasks in parallel with Dask inside that flow, you’ll dramatically reduce agent/API burst load. - Each flow run has its own process and httpx client; replacing 300 flow runs with 1 flow run + 300 Dask-backed tasks shifts the concurrency into the task runner and keeps the agent from opening hundreds of API connections at once. Minimal example (Prefect 2.x + prefect-dask)
Copy code
pip install prefect-dask dask distributed
Copy code
from prefect import flow, task
from prefect_dask.task_runners import DaskTaskRunner

@task
def process_item(i):
    # your work here
    return i * 2

# Local cluster; for a remote scheduler use: DaskTaskRunner(address="<tcp://scheduler:8786>")
task_runner = DaskTaskRunner(
    cluster_class="distributed.LocalCluster",
    cluster_kwargs={"n_workers": 16, "threads_per_worker": 1},
)

@flow(name="bulk-run", task_runner=task_runner)
def bulk_run(items: list[int]):
    futures = [process_item.submit(i) for i in items]
    return [f.result() for f in futures]

if __name__ == "__main__":
    bulk_run(list(range(300)))
Caveats - Tasks still report state to the API, but the request pattern is much gentler than 300 separate flow run submissions. - If those 300 flows are triggered independently (e.g., external webhooks), you’d need to add a buffering/aggregator layer that batches into one “bulk” flow run. If you can’t restructure into batch flows, Dask won’t solve the agent’s pool exhaustion. In that case: - Run multiple agents to spread connections across processes. - Add backpressure (concurrency limits by tag; stagger schedules) so not all 300 start at once. - Consider migrating to workers/work pools when possible for better concurrency controls. Docs - prefect-dask task runner (Prefect 2.x) - Task runners (Prefect 2.x) If you share how those 300 are triggered (one deployment vs many, schedule vs event), I can suggest a concrete batching or throttling pattern.