Abrar Hussain
09/19/2025, 8:54 PMMarvin
09/19/2025, 8:54 PMMarvin
09/19/2025, 9:02 PMbulk_copy_runs.py
and run it from a machine that’s logged into your Prefect workspace (PREFECT_API_URL/API_KEY set or CLI profile in use).
import asyncio
from prefect import get_client
from prefect.client.schemas.filters import (
FlowRunFilter,
FlowRunFilterState,
FlowRunFilterStateType,
DeploymentFilterName,
DeploymentFilter,
)
from prefect.states import StateType
# from prefect.states import Cancelled # uncomment if you want to cancel originals
# CONFIG
LIMIT = 1000 # how many to process
ONLY_DEPLOYMENT = None # e.g. "my-flow/my-deployment" or leave None to copy all
NAME_SUFFIX = " (copy)" # suffix for new runs; set to "" to keep same name
async def main():
async with get_client() as client:
# Build filter for PENDING runs
flow_run_filter = FlowRunFilter(
state=FlowRunFilterState(
type=FlowRunFilterStateType(any_=[StateType.PENDING])
)
)
# Optional: restrict to a single deployment by "<flow>/<deployment>" name
deployment_filter = None
if ONLY_DEPLOYMENT:
deployment_filter = DeploymentFilter(name=DeploymentFilterName(any_=[ONLY_DEPLOYMENT]))
runs = await client.read_flow_runs(
flow_run_filter=flow_run_filter,
deployment_filter=deployment_filter,
limit=LIMIT,
)
print(f"Found {len(runs)} pending runs")
created = 0
for r in runs:
# Ensure we have full details (parameters, tags, etc.)
run = await client.read_flow_run(r.id)
new = await client.create_flow_run_from_deployment(
deployment_id=run.deployment_id,
parameters=run.parameters or {},
tags=run.tags or [],
name=(f"{run.name}{NAME_SUFFIX}") if run.name else None,
# job_variables=run.job_variables or {}, # uncomment if you use job variables
# scheduled_start_time=None, # run immediately
)
print(f"Copied {run.id} -> {new.id}")
created += 1
# Optional: cancel the original run after successful copy
# await client.set_flow_run_state(run.id, state=Cancelled())
print(f"Created {created} new runs")
if __name__ == "__main__":
asyncio.run(main())
How to run:
- Make sure you’re authenticated (e.g., prefect cloud login
or set `PREFECT_API_URL`/`PREFECT_API_KEY`)
- Run: python bulk_copy_runs.py
Questions to confirm so I can tailor it:
- Are all 1,000 pending runs for the same deployment, or multiple deployments?
- Do you want the script to cancel the original pending runs after creating the copies?
- Do you need to copy job variables as well? If so, I’ll enable that line.Abrar Hussain
09/19/2025, 9:05 PMMarvin
09/19/2025, 9:05 PMMarvin
09/19/2025, 9:08 PMimport asyncio
from prefect.client.orchestration import get_client
from prefect.client.schemas.filters import (
FlowRunFilter,
FlowRunFilterState,
FlowRunFilterStateType,
FlowRunFilterDeploymentId,
)
from prefect.states import StateType, Cancelled
# CONFIG
DEPLOYMENT_NAME = "your-flow-name/your-deployment-name" # e.g., "etl/daily-load"
LIMIT = 1000
CONCURRENCY = 20 # adjust to control API load
NAME_SUFFIX = "" # or " (copy)" if you want to mark the new runs
async def copy_and_cancel_one(client, run):
# Get full details for parameters/tags/name
run = await client.read_flow_run(run.id)
# Create new run on the same deployment
new = await client.create_flow_run_from_deployment(
deployment_id=run.deployment_id,
parameters=run.parameters or {},
tags=run.tags or [],
name=(f"{run.name}{NAME_SUFFIX}") if run.name else None,
)
print(f"Copied {run.id} -> {new.id}")
# Cancel the original
await client.set_flow_run_state(run.id, state=Cancelled())
print(f"Cancelled original {run.id}")
async def main():
async with get_client() as client:
# Resolve deployment id by name: "<flow>/<deployment>"
flow_name, deployment_name = DEPLOYMENT_NAME.split("/", 1)
deployment = await client.read_deployment_by_name(flow_name=flow_name, name=deployment_name)
if not deployment:
raise RuntimeError(f"Deployment not found: {DEPLOYMENT_NAME}")
# Find PENDING runs for this deployment
fr_filter = FlowRunFilter(
state=FlowRunFilterState(
type=FlowRunFilterStateType(any_=[StateType.PENDING])
),
deployment_id=FlowRunFilterDeploymentId(any_=[deployment.id]),
)
runs = await client.read_flow_runs(flow_run_filter=fr_filter, limit=LIMIT)
print(f"Found {len(runs)} PENDING runs for {DEPLOYMENT_NAME}")
# Process with limited concurrency
sem = asyncio.Semaphore(CONCURRENCY)
async def guarded(run):
async with sem:
try:
await copy_and_cancel_one(client, run)
except Exception as e:
print(f"Error processing {run.id}: {e}")
await asyncio.gather(*(guarded(r) for r in runs))
if __name__ == "__main__":
asyncio.run(main())
How to run
- Ensure your environment is authenticated to your Prefect 2 workspace (PREFECT_API_URL/API_KEY or prefect cloud login
)
- Set DEPLOYMENT_NAME
to the exact "<flow>/<deployment>" name
- Run: python bulk_copy_runs.py
Optional tweaks
- If your “pending” runs are stuck for multiple reasons, you can change the state filter to include SCHEDULED (or others) by adding StateType.SCHEDULED
to the list.
- Increase/decrease CONCURRENCY
if you hit API rate limits.
Side note: If/when you upgrade to Prefect 3, the same approach works with minor import changes, but there still isn’t a bulk “Copy to new run” button in the UI.Abrar Hussain
09/22/2025, 6:13 PMMarvin
09/22/2025, 6:13 PMMarvin
09/22/2025, 6:18 PMLimits
(e.g., max_connections
, max_keepalive_connections
) via settings or env vars in 2.x.
What you can do instead
- Reduce concurrent API load per process
- Use work queue/pool concurrency limits in the UI to cap how many flow runs start at once.
- If you’re on workers, cap worker concurrency so each worker submits fewer jobs at a time.
- Run more (smaller) agents/workers instead of one very busy one to distribute connections across processes.
- Update to the latest 2.x (or 3.x if you can). Older 2.x releases had a few client/connection handling fixes; upgrading can help prevent connection leaks under load.
- Increase polling intervals or prefetch windows if you’ve customized them to be very aggressive (fewer, less-bursty API calls).
- If you’re self-hosting, check any reverse proxy (nginx, ALB, etc.) keepalive and connection limits; too-low upstream limits can cascade into pool exhaustion.
Helpful docs
- Agents (Prefect 2.x)
- Work pools and workers (Prefect 2.x)
- Connecting to Prefect Cloud / networking notes
If you can share:
- Your exact Prefect version (e.g., 2.13.x)
- Whether you’re using the legacy “agent” or newer “worker” model
- The exact error text/stack trace (e.g., httpx.PoolTimeout
, max_connections
exceeded)
- Rough scale (number of flow runs starting concurrently)
…I can suggest a more precise set of steps (and confirm if there’s a known bug fixed in a later version).Abrar Hussain
09/22/2025, 6:36 PMMarvin
09/22/2025, 6:36 PMMarvin
09/22/2025, 6:37 PMPREFECT_API_REQUEST_TIMEOUT
only changes how long each request waits before timing out. It does not increase httpx’s connection pool size, so it won’t fix PoolTimeout/“connection pool exhausted” errors. In fact, a larger timeout can keep connections checked out longer and make pool exhaustion more likely during a burst.
What will help for a 300-run burst with a 2.18 agent
- Spread the load across processes
- Run multiple agents (e.g., 3–5) against the same queue(s) so each process has its own httpx pool.
- Alternatively, split flows across multiple work queues and run one agent per queue.
- Cap concurrent starts to add backpressure
- Set a concurrency limit on the work queue (or use flow-run concurrency limits via tags) so only N runs start at once and the rest queue.
- This is the most reliable way to avoid client bursts that exhaust the pool.
- Stagger scheduling to avoid top-of-minute spikes
- Add offsets/jitter to schedule so 300 flows don’t try to start at the same second.
- Keep current on 2.x or consider workers
- You’re on 2.18 which is fairly recent, but ensure you’re on the latest 2.18 patch.
- If/when you can, moving to work pools + workers gives you better controls around concurrency and scaling.
Notes
- Prefect 2.x does not expose httpx’s `max_connections`/`max_keepalive_connections` via env vars, so you can’t increase the pool size directly.
- If you’re self-hosting the API, also verify upstream limits (reverse proxy keepalive, server workers), but the error you’re seeing originates in the agent’s client pool.
Helpful references
- Agents (Prefect 2.x)
- Work queues (Prefect 2.x)
- Concurrency limits (Prefect 2.x)
If you can share:
- The exact exception text (e.g., httpx.PoolTimeout
)
- Whether you’re on Cloud or self-hosted
- How many agents you’re running now and how they’re deployed
…I can propose a concrete rollout (e.g., number of agents, a safe queue concurrency limit, and scheduling tweaks) tailored to your setup.Abrar Hussain
09/22/2025, 7:48 PMMarvin
09/22/2025, 7:48 PMMarvin
09/22/2025, 7:51 PMpip install prefect-dask dask distributed
from prefect import flow, task
from prefect_dask.task_runners import DaskTaskRunner
@task
def process_item(i):
# your work here
return i * 2
# Local cluster; for a remote scheduler use: DaskTaskRunner(address="<tcp://scheduler:8786>")
task_runner = DaskTaskRunner(
cluster_class="distributed.LocalCluster",
cluster_kwargs={"n_workers": 16, "threads_per_worker": 1},
)
@flow(name="bulk-run", task_runner=task_runner)
def bulk_run(items: list[int]):
futures = [process_item.submit(i) for i in items]
return [f.result() for f in futures]
if __name__ == "__main__":
bulk_run(list(range(300)))
Caveats
- Tasks still report state to the API, but the request pattern is much gentler than 300 separate flow run submissions.
- If those 300 flows are triggered independently (e.g., external webhooks), you’d need to add a buffering/aggregator layer that batches into one “bulk” flow run.
If you can’t restructure into batch flows, Dask won’t solve the agent’s pool exhaustion. In that case:
- Run multiple agents to spread connections across processes.
- Add backpressure (concurrency limits by tag; stagger schedules) so not all 300 start at once.
- Consider migrating to workers/work pools when possible for better concurrency controls.
Docs
- prefect-dask task runner (Prefect 2.x)
- Task runners (Prefect 2.x)
If you share how those 300 are triggered (one deployment vs many, schedule vs event), I can suggest a concrete batching or throttling pattern.