Srul Pinkas
12/09/2024, 11:06 AMrun_deployment
in order to allocate separate resources). this creates dozens sub-flows but the parent-job trying to keep updated and manage it crashes due to connection-limit reached error (an internal httpx
crash). I asked marvin about configuring it on prefect and didn't see any immediate solution, perhaps more than 4-5 running in parallel is too much for prefect? am i missing something?
- create many sub-flows but run it through a managed concurrency-limit queue (say, 3 at each given time - once one is done another kicks in). since it's a flow and not a task i had to go through work-pools /work-queue limits configuration. the thing is that since it takes some time to get the actual machine (mostly for the first batch from cluster), during that time it seems there's an error for the sub-flow yet to be executed (-1 signal until later it succeeds). and due to that temp-failure, other sub-flows also start to run and i'm missing my 3-limit (so 6-7 are running in parallel) which is odd as it seems like a common glitch. I looked for a parameter that controls how much time to wait for the first sub-flow-submit failure to avoid this race-edge-case but the only parameter (job_watch_timeout_seconds
) that the docs were promising about, was also described as: "Number of seconds to wait for each event emitted by a job before timing out" - so raising this to 1-2 minutes seems too important/risky.
- The third fallback option i use is creating less but chubbier jobs - each running serially 10 different inputs - and running them in parallel. but that seems like a bad solution for such a platform that should take care of that..
Did i miss any simple way out of this?
Thanks!Bianca Hoch
12/09/2024, 7:04 PM- simply trigger each sub-flow (throughHi Srul! This option makes sense to me. Do you have an example of the error that the parent throws?in order to allocate separate resources). this creates dozens sub-flows but the parent-job trying to keep updated and manage it crashes due to connection-limit reached error (an internalrun_deployment
crash). I asked marvin about configuring it on prefect and didn't see any immediate solution, perhaps more than 4-5 running in parallel is too much for prefect? am i missing something?httpx
Bianca Hoch
12/09/2024, 7:05 PMSrul Pinkas
12/10/2024, 1:45 PMSrul Pinkas
02/07/2025, 3:36 PMrun_deployment
call? if that timeout is reached while running, is it really crashing or does the job keep on running and once succeeds it's all good?Marvin
02/07/2025, 3:36 PMrun_deployment
call in Prefect 2.20. However, generally in Prefect, if a timeout is reached during a flow or task execution, the execution is typically halted. The behavior might vary depending on how the timeout is configured and handled within the flow or task logic itself.
To ensure proper handling of timeouts, you might need to explicitly set and manage them in your flow or task configurations. If a task or flow is set to timeout and it does, the execution would stop unless you've implemented logic to handle such scenarios.
You can explore more about timeouts and deployment configurations in the Prefect documentation: Prefect Deployments Guide.
If you have specific configurations or setups, you might want to check those to confirm the behavior.Srul Pinkas
02/08/2025, 11:31 AMrun_deployment
job, i need to use a work_queue_name
yaml definition that has concurrency limit? is this enough? or do i also need to make sure the work_pool --> name
also represents a concurrent-limited entity? (prefect 2.20)Marvin
02/08/2025, 11:31 AMrun_deployment
job in Prefect 2.20, you need to configure both the work queue and work pool appropriately:
1. Work Queue: You should define a work_queue_name
in your YAML configuration that has a concurrency limit set. This allows you to control how many tasks or flows can run concurrently within that specific queue.
2. Work Pool: Additionally, you need to ensure that the work_pool
itself has a concurrency limit configured. The work pool's concurrency limit acts as an overarching control, ensuring that the total number of concurrent runs across all queues within the pool does not exceed the specified limit.
In summary, both the work queue and the work pool need to have concurrency limits if you want to manage concurrency effectively across your deployments. This setup provides fine-grained control over the execution of runs.
For more detailed instructions, you might want to refer to the Prefect documentation on work pools and queues.Srul Pinkas
02/11/2025, 1:02 PMtask
wrapped functions, how do i control the concurrency to not go above 5 at the same time?Marvin
02/11/2025, 1:03 PM"limited_concurrency"
.
2. Set Concurrency Limit: Use Prefect's task concurrency limits to specify the maximum number of concurrent task runs for tasks with the given tag. You can set this limit to 5.
Here's a high-level example of how you might implement this:
python
from prefect import flow, task
from prefect.tasks import task_input_hash
@task(tags=["limited_concurrency"])
def my_task():
# Task logic here
pass
@flow
def my_flow():
for _ in range(10):
my_task()
if __name__ == "__main__":
my_flow()
To enforce the concurrency limit, you would configure it using the Prefect CLI or UI:
bash
prefect concurrency-limit create --tag limited_concurrency --limit 5
This setup ensures that no more than 5 tasks with the "limited_concurrency"
tag run at the same time. Further details can be found in the Prefect Tasks Documentation.Srul Pinkas
02/11/2025, 1:03 PM@task
decorator or when i do the actual submit?Marvin
02/11/2025, 1:04 PM@task
decorator or when submitting tasks. Concurrency limits are managed via task tags and configured outside of the code, typically using the Prefect CLI or UI.
Here's a concise approach to manage concurrency:
1. Tag your tasks: Use the tags
parameter in the @task
decorator to assign a specific tag to tasks you want to control. For example:
python
from prefect import task, flow
@task(tags=["limited_concurrency"])
def my_task():
# Task logic here
pass
2. Set concurrency limits using the CLI: Use the Prefect CLI to set a concurrency limit on the tag:
bash
prefect concurrency-limit create --tag limited_concurrency --limit 5
This approach ensures that no more than 5 tasks with the "limited_concurrency"
tag run concurrently. Unfortunately, setting concurrency limits directly in code is not supported, and you'll need to use external configuration for this purpose.Srul Pinkas
02/13/2025, 12:36 PMMarvin
02/13/2025, 12:37 PMSlackCredentials
block. Here's how you can load the stored Slack credentials:
1. Load the Slack Credentials Block:
python
from prefect_slack import SlackCredentials
# Replace "BLOCK_NAME" with the name of your block
slack_credentials_block = SlackCredentials.load("BLOCK_NAME")
2. Get the Slack Client:
Once you've loaded the block, you can get a Slack client to interact with Slack:
python
client = slack_credentials_block.get_client()
3. Webhook URL:
If you need to set up a webhook, you can refer to Prefect's webhook documentation for managing webhook URLs. Prefect Cloud assigns a webhook URL when you create a webhook, and it looks something like <https://api.prefect.cloud/hooks/your-webhook-id>
.
For more details, you can refer to the Slack Credentials documentation.
If you need to create a webhook, you can find more information in the webhook guide.
Let me know if you need further assistance!Srul Pinkas
02/19/2025, 3:00 PMlocalhost
instead of a link - where can i configure it to be a proper link?Marvin
02/19/2025, 3:00 PMlocalhost
, you should look into how the notification URLs are being templated. Prefect 3.x allows you to use templated variables for dynamic content in notifications. Specifically, you can use the {{ flow_run|ui_url }}
token to include the correct URL to view the flow run in the Prefect UI.
Here are some steps you can take:
1. Check your Automation Configuration: Make sure your automation configuration is set up correctly to use the ui_url
token. This will ensure that the correct URL is included in your notifications.
2. Use Jinja Templating: Prefect supports Jinja templating, which allows you to dynamically include details such as the flow or pool name. For example, you can use {{ flow_run|ui_url }}
to insert the URL to the flow run in your Slack message.
3. Update Notification Settings: Ensure that your notification settings in the Prefect configuration or your deployment YAML are properly utilizing these tokens.
For more details, you can refer to the Prefect documentation on webhook triggers and automation triggers.
If the issue persists, make sure there are no misconfigurations in your deployment or webhook settings that might be defaulting to localhost
.
If you need further assistance with specific configurations, feel free to ask!Srul Pinkas
02/19/2025, 3:00 PMMarvin
02/19/2025, 3:01 PMlocalhost
, it's likely due to how the URLs are being generated or configured within your setup. Here are some steps and resources to help you configure it correctly:
1. Slack Webhook Configuration: Ensure that your Slack webhook is correctly set up and that you are using the right URL for notifications. You can find more about configuring Slack webhooks in the Prefect Slack integration documentation.
2. Notification Settings: Check your notification settings in Prefect. You can configure notifications to trigger on specific flow run states and specify the Slack webhook URL. More details can be found in the Prefect notifications guide.
3. Flow Run URLs: If the flow run URLs are pointing to localhost
, ensure that your Prefect server or agent is correctly configured to use the appropriate hostname or IP address. This might involve setting environment variables or adjusting your Prefect server configuration.
4. Use of Tokens: Make sure that you are using the correct tokens or placeholders in your notification messages to dynamically generate the correct URLs.
If the issue is due to a bug or a misconfiguration, reviewing your Prefect server and deployment settings might be necessary. If you need more specific guidance, feel free to provide additional details about your setup!Srul Pinkas
02/19/2025, 3:03 PMMarvin
02/19/2025, 3:03 PMlocalhost
, you can adjust the following environment variables:
1. PREFECT_API_URL
: This environment variable should be set to the external URL of your Prefect server's API. For example, if your server is accessible at <https://your-domain.com>
, you would set:
bash
export PREFECT_API_URL="<https://your-domain.com/api>"
2. PREFECT_UI_API_URL
: This is specifically for the UI. It should be set to the external proxy URL. If not set, it defaults to PREFECT_API_URL
. To ensure proper configuration, set it explicitly:
bash
export PREFECT_UI_API_URL="<https://your-domain.com/api>"
These configurations will ensure that links generated in notifications point to the correct domain rather than localhost
.
For more detailed guidance on configuring your Prefect server, you can refer to the Prefect hosting guide and the settings guide. Adjust your server configuration and restart your server to apply these changes.Srul Pinkas
02/20/2025, 5:16 PMwork_pool = await client.read_work_pool(deploy_config["work_pool"])
line inside prefect's internal code, from our CI. The pool name from the yaml are configured on server. Can you think why this would happen? i think this might be the first code-line to interact with the clientMarvin
02/20/2025, 5:16 PMread_work_pool
line taking a long time and eventually crashing might be related to a few potential causes:
1. Work Pool Configuration: Ensure that your work pool is properly configured and that there are active workers connected. A work pool needs at least one online worker sending heartbeats to be considered ready to execute work. You can manage work pools using the Prefect CLI.
2. Network or API Issues: There might be network issues or API endpoint misconfigurations causing the delay. Verify that your Prefect server is accessible and that the API URL is correctly set. You can set the PREFECT_API_URL
environment variable to ensure the client connects to the right endpoint.
3. Timeouts and Retries: Consider configuring timeouts and retries for your client connections if network instability is an issue. This can help handle transient failures more gracefully.
4. Version Mismatch or Bugs: Ensure that your Prefect server and client versions are compatible. Check the Prefect GitHub issues for any known bugs related to read_work_pool
or similar functions that might affect version 2.20.
For more details on managing work pools, you can refer to the work pools documentation. If the problem persists, you might want to look into logs for more specific error messages or consider reaching out to the Prefect community for support.Srul Pinkas
02/24/2025, 3:15 PM0 0 * * 3
means midnight wednesday once a week? @MarvinMarvin
02/24/2025, 3:15 PMmin/hour/day/month/weekday
. Therefore, the expression 0 0 * * 3
indeed schedules a task to run at midnight (00:00) every Wednesday.
For more details on scheduling in Prefect, you can refer to the Prefect 2.x scheduling documentation.
If you have any more questions about scheduling or anything else, feel free to ask.Srul Pinkas
02/26/2025, 10:08 AMday_or
meaning in the yaml?Marvin
02/26/2025, 10:08 AMday_or
parameter in the YAML schedule configuration specifies how the day of the month and the day of the week entries in a cron expression are interpreted.
- If day_or
is set to True
, the schedule will run if either the day of the month or the day of the week condition is satisfied. This means the conditions act as an OR operation.
- If day_or
is set to False
, both the day of the month and the day of the week conditions must be satisfied for the schedule to run, acting as an AND operation.
This behavior allows for more flexible scheduling options, depending on whether you want the task to run on specific days of the week, specific days of the month, or both.
For more details, you can refer to the Prefect 2.x documentation on scheduling.Srul Pinkas
02/27/2025, 12:54 PMMarvin
02/27/2025, 12:55 PMSrul Pinkas
02/28/2025, 11:13 AMMarvin
02/28/2025, 11:13 AMSrul Pinkas
02/28/2025, 11:14 AMMarvin
02/28/2025, 11:14 AMSrul Pinkas
03/03/2025, 11:26 AMMarvin
03/03/2025, 11:26 AMprefect.context
to access certain runtime information, though tags specifically might not be directly accessible this way.
If you are looking for a more concrete solution, you can check the Prefect documentation on flows for any updates or detailed methods to access flow metadata programmatically.
If you have further questions or need more specific guidance, feel free to ask!Srul Pinkas
03/03/2025, 3:35 PMdeployment_id
- from which i can probably get the deployment tags (from the yaml), right? can i get a snippet to extract the deployment tags?
also, since i'm sometimes running locally - can i do it safe not to crash in case i don't have any deployment and just running stuff in local python no server?Marvin
03/03/2025, 3:35 PMdeployment_id
from the prefect.runtime.flow_run
module. If you're running the flow as part of a deployment, you can use this deployment_id
to fetch deployment details and tags. Here's a snippet to safely extract deployment tags, even if running locally without a deployment:
python
from prefect.runtime.flow_run import id as flow_run_id
from prefect.client import get_client
async def get_deployment_tags():
try:
# Only proceed if there's a flow run ID, indicating a deployment
if not flow_run_id:
print("Running locally, no deployment ID available.")
return None
async with get_client() as client:
# Fetch the flow run details
flow_run = await client.read_flow_run(flow_run_id)
deployment_id = flow_run.deployment_id
# Fetch deployment details using the deployment ID
if deployment_id:
deployment = await client.read_deployment(deployment_id)
return deployment.tags
except Exception as e:
print(f"An error occurred: {e}")
return None
# Example usage
import asyncio
tags = asyncio.run(get_deployment_tags())
print(f"Deployment tags: {tags}")
This code checks if there's a flow_run_id
, which indicates a deployment context. If not, it assumes you're running locally and exits gracefully. If there is a deployment context, it fetches the deployment details and retrieves the tags.Srul Pinkas
03/03/2025, 3:47 PMflow
object at all? or just use context to get the live stats?Marvin
03/03/2025, 3:48 PMflow
object to access runtime information when you are trying to get live stats or metadata about the current flow run. Using the prefect.runtime
context is sufficient for accessing this kind of information. The prefect.runtime.flow_run
module provides direct access to various attributes of the flow run, such as its ID, tags, and other metadata.
In the example I provided, I used prefect.runtime.flow_run.id
to check if the code is running within a deployment and to retrieve the deployment ID. This approach is efficient and aligns with accessing runtime metadata directly.
If you have any more questions or need further clarification, feel free to ask!Srul Pinkas
03/10/2025, 11:48 AMMarvin
03/10/2025, 11:48 AMSrul Pinkas
03/10/2025, 11:50 AMMarvin
03/10/2025, 11:50 AMpython
from prefect import task, Flow
from prefect.utilities.notifications import slack_notifier
from prefect.engine.state import Failed, Pending, Running
handler = slack_notifier(only_states=[Failed])
@task
def my_task():
pass
with Flow("my_flow", state_handlers=[handler]) as flow:
my_task()
3. Review Notifications Documentation: The Prefect 2.x documentation on notifications explains how to set up and customize notifications. It might offer additional insights into managing state-based notifications.
4. Community Discussions: If these solutions don't resolve your issue, consider engaging with the Prefect community or submitting a feature request on GitHub. There might be others with similar challenges who have found workarounds.
These steps should help you better control when notifications are sent, reducing false alarms.Srul Pinkas
03/12/2025, 12:16 PMtask
through UI, in retro? if it's completed i don't see a button. is that true also for failed ones?
(prefect 2.20)Marvin
03/12/2025, 12:16 PMSrul Pinkas
09/10/2025, 4:31 PMMarvin
09/10/2025, 4:32 PMMarvin
09/10/2025, 4:32 PMSrul Pinkas
09/10/2025, 4:33 PMMarvin
09/10/2025, 4:33 PMSrul Pinkas
09/10/2025, 4:36 PMMarvin
09/10/2025, 4:36 PMMarvin
09/10/2025, 4:36 PMMarvin
09/10/2025, 4:46 PMprefect flow-run cancel <PARENT_FLOW_RUN_ID>
You can also click “Cancel” in the UI on the parent run. If the pod doesn’t terminate promptly after cancelling, manually delete the Kubernetes job/pod.
Why this happened
- If your parent launched child runs via run_deployment(...)
or create_flow_run
and then waited on them, it’s waiting on those specific run IDs.
- Manually re-running a child from the UI often creates a new run; the parent won’t notice that new run finishing.
- One more gotcha: wait_for_flow_run
has a default 3h timeout. If your parent used it without overriding timeout
, it may have stopped waiting after 3 hours.
What to do next
- If you only need the parent marked done (and don’t need wrap-up code to execute), cancelling is the cleanest way to stop the pod. Starting a fresh “wrap-up” flow that checks downstream results (and does any finalization) is safer than trying to force the parent to continue.
- If you do need the parent’s wrap-up code to run, re-run the parent with logic that skips work for children that are already Completed.
Patterns to use going forward
1) Call subflows as functions (best for tight orchestration)
from prefect import flow
@flow
def child(item_id: str):
...
@flow
def parent(ids: list[str]):
for i in ids:
child.with_options(name=f"child-{i}")(i) # waits for each child
2) If you use deployments, explicitly wait for the exact child runs you started and remove the timeout
from prefect import flow
from prefect.deployments import run_deployment
from prefect.flow_runs import wait_for_flow_run
@flow
def parent(ids: list[str]):
runs = []
for i in ids:
fr = run_deployment(
name="child-flow/deployment",
parameters={"id": i},
as_subflow=True, # ties the run to this parent
timeout=None # do not stop waiting early
)
runs.append(fr)
for fr in runs:
wait_for_flow_run(fr.id, timeout=None, log_states=True)
3) Prefer configuring retries on the child flow (so the same run ID retries) instead of manually re-running
@flow(retries=3, retry_delay_seconds=60)
def child(...):
...
A couple questions so I can give you targeted fixes:
- How did the parent launch the subflows: direct @flow
calls or run_deployment
?
- How did you retrigger the children: UI “Retry” on the failed run, or “Run”/“Rerun” on the deployment (creating new runs)?
- Are you using a Kubernetes worker/pool for the parent pod?
If you share those details or a minimal snippet of the parent orchestration, I can suggest the smallest change to make this recoverable and prevent it in the future.Srul Pinkas
10/07/2025, 1:37 PMMarvin
10/07/2025, 1:37 PMMarvin
10/07/2025, 1:41 PMprefect work-pool pause "<pool-name>"
- You can also stop/scale down your workers. This prevents any new runs from executing.
- Docs: Work pools
If you want to stop new scheduled runs from being created
- Self-hosted Prefect Server only: disable the scheduler service globally and restart the API
- Set env var on your Prefect API service:
PREFECT_API_SERVICES_SCHEDULER_ENABLED=false
- This stops all scheduled flow runs from being created until you re-enable it.
- Prefect Cloud: there’s no workspace-level “disable scheduler” switch. You’ll need to pause/clear schedules per deployment:
- List schedules for a deployment:
prefect deployment schedule ls "<deployment-name>"
- Pause a specific schedule:
prefect deployment schedule pause "<deployment-name>" <schedule-id>
- Remove all schedules for a deployment (you can re-create them by re-deploying later):
prefect deployment schedule clear "<deployment-name>"
- Schedules docs: Schedules in Prefect 2
Notes on bulk operations
- There isn’t a one-click bulk pause/clear for all deployments in 2.20.
- Practical approaches:
- If you have a small number of work pools, pausing pools is the fastest way to stop spend immediately.
- If you need to truly stop new runs from being created, you can script across deployments with the CLI (e.g., iterate over deployment names and call prefect deployment schedule clear "<deployment>"
). I’m happy to provide a small script if you can share how your deployments are named or whether you’re on Mac/Linux/Windows.
Quick clarifications to tailor the steps:
1) Are you on Prefect Cloud or self-hosted Prefect Server?
2) Do you want to stop runs from being created at all, or just prevent execution to save compute?Srul Pinkas
10/07/2025, 2:54 PMschedules
as the UI also mentions the specific times.
i assume both need to be on in order to run. can i easily query all deployments that have BOTH on, to see which are problematically set to run?Marvin
10/07/2025, 2:54 PMMarvin
10/07/2025, 2:56 PMprefect deployment schedule ls "<deployment-name>"
- Pause a specific schedule:
prefect deployment schedule pause "<deployment-name>" <schedule-id>
- Clear all schedules for a deployment:
prefect deployment schedule clear "<deployment-name>"
Srul Pinkas
10/07/2025, 2:57 PMMarvin
10/07/2025, 2:57 PMMarvin
10/07/2025, 2:59 PMimport asyncio
from typing import List, Dict
try:
# Prefect 2.10+ exposes get_client here
from prefect.client import get_client
except ImportError:
# Older 2.x fallback
from prefect.client.orion import get_client # type: ignore
async def find_active_deployments_with_enabled_schedules() -> List[Dict]:
rows = []
async with get_client() as client:
offset = 0
limit = 200
while True:
deployments = await client.read_deployments(offset=offset, limit=limit)
if not deployments:
break
for d in deployments:
# Deployment must be active
if getattr(d, "paused", False):
continue
# Get full deployment so we can inspect schedules
full = await client.read_deployment(d.id)
schedules = getattr(full, "schedules", []) or []
active_schedules = [s for s in schedules if getattr(s, "active", False)]
if not active_schedules:
continue
flow_name = getattr(full, "flow_name", None)
name = f"{flow_name}/{full.name}" if flow_name else full.name
rows.append(
{
"name": name,
"deployment_id": str(full.id),
"work_pool": getattr(full, "work_pool_name", None)
or getattr(full, "work_queue_name", None),
"active_schedule_count": len(active_schedules),
# Optional: include schedule IDs/times
"schedule_ids": [str(s.id) for s in active_schedules],
}
)
offset += limit
return rows
if __name__ == "__main__":
results = asyncio.run(find_active_deployments_with_enabled_schedules())
print(f"Deployments active with enabled schedules: {len(results)}")
for r in results:
print(
f"- {r['name']} "
f"(id={r['deployment_id']}, pool={r['work_pool']}, "
f"active_schedules={r['active_schedule_count']}, "
f"schedule_ids={r['schedule_ids']})"
)
If you’d like, I can tweak this to:
- dump to CSV/JSON,
- include schedule expressions/next times,
- or directly pause/clear schedules for everything it finds.
Reference: Schedules in Prefect 2Srul Pinkas
10/08/2025, 1:51 PM15:28:54.822 | WARNING | prefect.server.services.recentdeploymentsscheduler - RecentDeploymentsScheduler took 20.682503 seconds to run, which is longer than its loop interval of 5 seconds.
result = await greenlet_spawn(
File "/usr/local/lib/python3.9/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 203, in greenlet_spawn
result = context.switch(value)
File "/usr/local/lib/python3.9/site-packages/sqlalchemy/orm/session.py", line 2362, in execute
return self._execute_internal(
File "/usr/local/lib/python3.9/site-packages/sqlalchemy/orm/session.py", line 2247, in _execute_internal
result: Result[Any] = compile_state_cls.orm_execute_statement(
File "/usr/local/lib/python3.9/site-packages/sqlalchemy/orm/context.py", line 293, in orm_execute_statement
result = conn.execute(
File "/usr/local/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1418, in execute
return meth(
File "/usr/local/lib/python3.9/site-packages/sqlalchemy/sql/elements.py", line 515, in _execute_on_connection
return connection._execute_clauseelement(
File "/usr/local/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1640, in _execute_clauseelement
ret = self._execute_context(
File "/usr/local/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1846, in _execute_context
return self._exec_single_context(
File "/usr/local/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1986, in _exec_single_context
self._handle_dbapi_exception(
File "/usr/local/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 2358, in _handle_dbapi_exception
raise exc_info[1].with_traceback(exc_info[2])
File "/usr/local/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1967, in _exec_single_context
self.dialect.do_execute(
File "/usr/local/lib/python3.9/site-packages/sqlalchemy/engine/default.py", line 941, in do_execute
cursor.execute(statement, parameters)
File "/usr/local/lib/python3.9/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 572, in execute
self._adapt_connection.await_(
File "/usr/local/lib/python3.9/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 132, in await_only
return current.parent.switch(awaitable) # type: ignore[no-any-return,attr-defined] # noqa: E501
File "/usr/local/lib/python3.9/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 196, in greenlet_spawn
value = await result
File "/usr/local/lib/python3.9/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 550, in _prepare_and_execute
self._handle_exception(error)
File "/usr/local/lib/python3.9/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 501, in _handle_exception
self._adapt_connection._handle_exception(error)
File "/usr/local/lib/python3.9/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 786, in _handle_exception
raise error
File "/usr/local/lib/python3.9/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 538, in _prepare_and_execute
self._rows = deque(await prepared_stmt.fetch(*parameters))
File "/usr/local/lib/python3.9/site-packages/asyncpg/prepared_stmt.py", line 176, in fetch
data = await self.__bind_execute(args, 0, timeout)
File "/usr/local/lib/python3.9/site-packages/asyncpg/prepared_stmt.py", line 241, in __bind_execute
data, status, _ = await self.__do_execute(
File "/usr/local/lib/python3.9/site-packages/asyncpg/prepared_stmt.py", line 230, in __do_execute
return await executor(protocol)
File "asyncpg/protocol/protocol.pyx", line 207, in bind_execute
asyncio.exceptions.TimeoutError
15:28:54.822 | WARNING | prefect.server.services.recentdeploymentsscheduler - RecentDeploymentsScheduler took 20.682503 seconds to run, which is longer than its loop interval of 5 seconds.
this also joins an issue where we saw a bump in our RDS prefect-DB costs bumping. do you see what's the issue?Marvin
10/08/2025, 1:51 PMMarvin
10/08/2025, 1:55 PM# Slow the loops, reduce work per cycle
PREFECT_SERVER_SERVICES_SCHEDULER_RECENT_DEPLOYMENTS_LOOP_SECONDS=30
PREFECT_SERVER_SERVICES_SCHEDULER_LOOP_SECONDS=60
PREFECT_SERVER_SERVICES_SCHEDULER_MAX_RUNS=50
PREFECT_SERVER_SERVICES_SCHEDULER_DEPLOYMENT_BATCH_SIZE=50
PREFECT_SERVER_SERVICES_SCHEDULER_INSERT_BATCH_SIZE=250
# Narrow the scheduling horizon (defaults: min=PT1H, max=P100D)
PREFECT_SERVER_SERVICES_SCHEDULER_MIN_SCHEDULED_TIME=PT2H
PREFECT_SERVER_SERVICES_SCHEDULER_MAX_SCHEDULED_TIME=P7D
DB tuning knobs (per server/services process)
- Pool sizing: larger pools reduce connection wait, but raise total RDS connections (and cost). Tune carefully.
PREFECT_SERVER_DATABASE_POOL_SIZE=10
PREFECT_SERVER_DATABASE_MAX_OVERFLOW=10
PREFECT_SERVER_DATABASE_POOL_TIMEOUT=30
PREFECT_SERVER_DATABASE_POOL_RECYCLE=3600
PREFECT_SERVER_DATABASE_CONNECT_ARGS__APPLICATION_NAME=prefect-server
# Optional: reduce prepared/statement cache sizes if memory pressure is high
PREFECT_SERVER_DATABASE_CONNECT_ARGS__PREPARED_STATEMENT_CACHE_SIZE=100
PREFECT_SERVER_DATABASE_CONNECT_ARGS__STATEMENT_CACHE_SIZE=100
Settings reference: Prefect 3 settings reference
Database side checks
- Enable slow query logging (e.g., log_min_duration_statement) or use RDS Performance Insights to confirm which statements are hot; look for scheduling queries hitting deployments and schedules tables.
- Ensure autovacuum is effective; vacuum/analyze if there’s bloat.
- If needed, increase DB instance size/IO temporarily to clear backlog.
- Consider setting a Postgres statement_timeout at the DB/parameter-group level to fail fast on pathological queries.
Upgrade note
- If you’re not on the latest 3.x, upgrading is recommended — we’ve shipped numerous scheduler and DB performance improvements over time. Release notes: Prefect releases
A few clarifying questions to pinpoint this faster
- Which Prefect version are you on?
- Roughly how many deployments and which schedule types are you using? Any RRule schedules with MINUTELY/BYSECOND?
- How many Prefect API/services processes are running (each one has its own connection pool)?
- Any recent growth in flow_runs or events volume around when the RDS cost spiked?
If you can share your scheduler-related env settings and a sample of the schedule definitions that run most frequently, I can suggest a tighter set of values tailored to your load.Srul Pinkas
10/08/2025, 1:58 PMMarvin
10/08/2025, 1:58 PMMarvin
10/08/2025, 1:59 PMPREFECT_SERVER_...
. If you were looking for PREFECT_SERVER_*
keys in a 2.x server, they won’t exist.
Yes, the log name is expected
- The log component prefect.server.services.recentdeploymentsscheduler
exists in 2.x as well. The warning “RecentDeploymentsScheduler took X seconds…” means that pass of scheduling took longer than its loop interval (the “recent deployments” loop defaults to 5 seconds in 2.x).
Prefect 2.20 scheduler defaults and env var names
- These are the common 2.x knobs and their defaults. If you don’t set the env vars, Prefect uses these defaults:
Scheduler service (2.x)
- PREFECT_API_SERVICES_SCHEDULER_ENABLED
= true
- PREFECT_API_SERVICES_SCHEDULER_LOOP_SECONDS
= 60
- PREFECT_API_SERVICES_SCHEDULER_RECENT_DEPLOYMENTS_LOOP_SECONDS
= 5
- PREFECT_API_SERVICES_SCHEDULER_DEPLOYMENT_BATCH_SIZE
= 100
- PREFECT_API_SERVICES_SCHEDULER_INSERT_BATCH_SIZE
= 500
- PREFECT_API_SERVICES_SCHEDULER_MAX_RUNS
= 100
- PREFECT_API_SERVICES_SCHEDULER_MIN_RUNS
= 3
- PREFECT_API_SERVICES_SCHEDULER_MIN_SCHEDULED_TIME
= PT1H (1 hour)
- PREFECT_API_SERVICES_SCHEDULER_MAX_SCHEDULED_TIME
= P100D (100 days)
Database pool (2.x)
- PREFECT_API_DATABASE_POOL_SIZE
= 5
- PREFECT_API_DATABASE_MAX_OVERFLOW
= 10
- PREFECT_API_DATABASE_POOL_TIMEOUT
= 30
- PREFECT_API_DATABASE_POOL_RECYCLE
= 3600
- PREFECT_API_DATABASE_CONNECTION_URL
= your DSN (this is the one you typically have set)
How to view the effective config (with defaults)
- Exec into the API/server pod and run:
prefect config view --show-defaults | grep -i -e api.services.scheduler -e api.database -e SCHEDULER -e DATABASE
- You can also narrow in on scheduler:
prefect config view --show-defaults | sed -n '/api.services.scheduler/,+25p'
Why the warning + RDS cost spike
- In 2.20 this usually means the scheduler’s DB queries are slow/expensive — often due to frequent schedules (e.g., RRule MINUTELY/BYSECOND), a wide scheduling horizon, many deployments, or DB resource/connection pressure. Each pass tries to generate upcoming runs; if that work takes > loop interval, it falls behind and keeps hammering the DB.
If you want, share:
- How many deployments and which schedule types you have (especially any RRule schedules)
- Your DB instance size and observed connections
- Whether you’re running multiple API/services pods (each has its own pool)
I can suggest concrete 2.x tuning values to reduce load. Also, if/when you’re ready, upgrading to 3.x includes additional scheduler and DB performance improvements; I’m happy to outline a path when it’s helpful.
Refs
- 2.x settings guide: Prefect 2 settings
- 2.x schedules overview: Schedules in Prefect 2Srul Pinkas
10/08/2025, 2:03 PMPREFECT_API_DATABASE_CONNECTION_TIMEOUT='30.0' (from env)
PREFECT_API_DATABASE_CONNECTION_URL='********' (from env)
PREFECT_API_DATABASE_ECHO='False' (from defaults)
PREFECT_API_DATABASE_MIGRATE_ON_START='True' (from defaults)
PREFECT_API_DATABASE_PASSWORD='********' (from defaults)
PREFECT_API_DATABASE_TIMEOUT='10.0' (from defaults)
PREFECT_API_SERVICES_SCHEDULER_DEPLOYMENT_BATCH_SIZE='100' (from defaults)
PREFECT_API_SERVICES_SCHEDULER_ENABLED='True' (from defaults)
PREFECT_API_SERVICES_SCHEDULER_INSERT_BATCH_SIZE='500' (from defaults)
PREFECT_API_SERVICES_SCHEDULER_LOOP_SECONDS='60.0' (from defaults)
PREFECT_API_SERVICES_SCHEDULER_MAX_RUNS='100' (from defaults)
PREFECT_API_SERVICES_SCHEDULER_MAX_SCHEDULED_TIME='100 days, 0:00:00' (from defaults)
PREFECT_API_SERVICES_SCHEDULER_MIN_RUNS='3' (from defaults)
PREFECT_API_SERVICES_SCHEDULER_MIN_SCHEDULED_TIME='1:00:00' (from defaults)
i'm still not sure what the root-cause is: if we're not running anything crazy on server, why would 20 seconds won't be enough for a (simple) query? what should i do to make it aligned?Marvin
10/08/2025, 2:03 PMMarvin
10/08/2025, 2:04 PMPREFECT_API_DATABASE_TIMEOUT=10.0
means any one DB operation exceeding 10s will time out at the client. That matches the asyncpg TimeoutError you’re seeing. The 20s loop time includes retries/await plus the rest of the loop.
Most actionable next steps
Start with visibility so you fix the real cause rather than just raising timeouts.
1) Identify the slow/blocked queries
- Temporarily enable slow query logging on RDS (e.g., set log_min_duration_statement=500ms) or use Performance Insights to see the top statements during scheduler spikes.
- Check for locks/contention:
- Query pg_stat_activity for waiting queries:
SELECT pid, wait_event_type, wait_event, state, query
FROM pg_stat_activity
WHERE datname = current_database()
ORDER BY state, wait_event_type;
- If you can enable pg_stat_statements, look for the top queries by total time.
2) Reduce scheduler work per loop (2.x env vars)
- These are 2.x names (prefix PREFECT_API_…). Try these first:
# Slow the “recent deployments” loop (default is 5s)
PREFECT_API_SERVICES_SCHEDULER_RECENT_DEPLOYMENTS_LOOP_SECONDS=30
# Reduce per-loop work and batching
PREFECT_API_SERVICES_SCHEDULER_MAX_RUNS=50
PREFECT_API_SERVICES_SCHEDULER_DEPLOYMENT_BATCH_SIZE=50
PREFECT_API_SERVICES_SCHEDULER_INSERT_BATCH_SIZE=250
# Narrow the scheduling horizon (defaults: min=PT1H, max=P100D)
PREFECT_API_SERVICES_SCHEDULER_MIN_SCHEDULED_TIME=PT2H
PREFECT_API_SERVICES_SCHEDULER_MAX_SCHEDULED_TIME=P7D
- These changes reduce how much the scheduler must compute/insert each pass.
3) Loosen the client timeout while you tune
- Your client timeout is currently 10s:
PREFECT_API_DATABASE_TIMEOUT=30
- Raising to 30–60 can stop spurious timeouts while you address the root cause. Don’t leave it too high long-term if queries are truly slow.
4) Right-size DB pooling and check total connections
- Defaults are modest but multiply by the number of API/services pods. If you have several pods, connection counts can spike.
PREFECT_API_DATABASE_POOL_SIZE=5 # or 10 if your DB can handle it
PREFECT_API_DATABASE_MAX_OVERFLOW=10
PREFECT_API_DATABASE_POOL_TIMEOUT=30
PREFECT_API_DATABASE_POOL_RECYCLE=3600
- Keep an eye on RDS max connections and CPU/IO. If the DB is saturated, the scheduler will suffer.
5) Check schedules and data volume
- Do you have any minute/second-level RRule schedules (e.g., MINUTELY, BYSECOND)? Those are known to be heavy. Prefer Cron/Interval schedules or reduce frequency.
- If you have millions of old flow_runs/flow_run_states/logs, scheduling queries can degrade. Ensure autovacuum/analyze are effective; consider manual VACUUM (FULL only with downtime) and REINDEX if bloat is high.
6) Optional: surface queries from Prefect
- For a short diagnostic window, you can set:
PREFECT_API_DATABASE_ECHO=true
This logs SQL from the API process. It’s very verbose; use briefly in non-peak periods.
What I’d do in your shoes
- Immediately bump PREFECT_API_DATABASE_TIMEOUT
to 30, and set:
```
PREFECT_API_SERVICES_SCHEDULER_RECENT_DEPLOYMENTS_LOOP_SECONDS=30
PREFECT_API_SERVICES_SCHEDULER_MAX_RUNS=50
PREFECT_API_SERVICES_SCHEDULER_DEPLOYMENT_BATCH_SIZE=50
PREFECT_API_SERVICES_SCHEDULER_INSERT_BATCH_SIZE=250Marvin
10/08/2025, 2:04 PMSrul Pinkas
10/08/2025, 2:21 PMMarvin
10/08/2025, 2:21 PMMarvin
10/08/2025, 2:23 PMflow_run_state
/ task_run_state
can definitely slow the scheduler.
What query is “stuck”?
- From the log snippet alone, we can’t see the exact SQL — Prefect 2.x does not log SQL by default. The asyncpg TimeoutError just tells us a DB call exceeded your PREFECT_API_DATABASE_TIMEOUT
(10s in your config).
- To identify the exact statement, use Postgres visibility:
- See currently running/waiting queries:
SELECT pid,
now() - query_start AS runtime,
state,
wait_event_type,
wait_event,
application_name,
query
FROM pg_stat_activity
WHERE datname = current_database()
AND state <> 'idle'
ORDER BY query_start;
- If you can, enable pg_stat_statements and get the top time-consuming statements:
-- enable once in your DB/parameter group:
-- CREATE EXTENSION IF NOT EXISTS pg_stat_statements;
SELECT total_exec_time, mean_exec_time, calls, query
FROM pg_stat_statements
ORDER BY total_exec_time DESC
LIMIT 20;
- RDS Performance Insights (if enabled) will also show you the exact SQL and wait events.
Optional: label Prefect connections so they’re easy to spot
- Add an application name to your DSN so queries show up labeled:
- Append ?application_name=prefect-api
to PREFECT_API_DATABASE_CONNECTION_URL
- After rolling the pod, look for application_name = 'prefect-api'
in pg_stat_activity
.
What the scheduler typically does
- Reads deployments with active schedules
- Computes upcoming run times (in Python)
- Inserts new rows into flow_runs
and corresponding states in batches
- Checks for duplicates/limits
If the state/run tables are very large and bloated or missing stats, even simple existence checks and inserts can slow down.
Safe ways to reduce table size quickly
1) Delete old flow runs (cascades states and task runs)
- In 2.x, FK constraints cascade states and task runs when you delete flow runs. If you’re comfortable purging old history, this is the fastest way to shrink *_state
tables.
- Example: delete completed flow runs older than 30 days. Adjust filters to your needs and test on a small window first.
-- recommended: create an index if not present to help this query:
-- CREATE INDEX IF NOT EXISTS idx_flow_runs_end_time ON flow_runs (end_time);
DELETE FROM flow_runs
WHERE end_time IS NOT NULL
AND end_time < now() - interval '30 days';
This will cascade delete from flow_run_states
, task_runs
, and task_run_states
via FKs.
2) Prune historical states older than N days while keeping the current state
- If you want to keep runs but trim old state history:
-- Flow run states: keep the state referenced by flow_runs.state_id, delete older ones
DELETE FROM flow_run_states s
USING flow_runs r
WHERE s.flow_run_id = r.id
AND s.id <> r.state_id
AND s.timestamp < now() - interval '30 days';
-- Task run states: same idea
DELETE FROM task_run_states s
USING task_runs r
WHERE s.task_run_id = r.id
AND s.id <> r.state_id
AND s.timestamp < now() - interval '30 days';
Notes:
- Ensure timestamp
exists in your schema (it does in 2.x).
- Do this in batches to avoid long locks, e.g., add LIMIT
with a CTE or slice by time ranges.
3) Maintenance after purge
- Update stats and reclaim space:
VACUUM (ANALYZE) flow_runs;
VACUUM (ANALYZE) flow_run_states;
VACUUM (ANALYZE) task_runs;
VACUUM (ANALYZE) task_run_states;
For significant bloat and if you can afford locks/downtime: VACUUM FULL
on the biggest tables or REINDEX TABLE ...
.
Tune the scheduler/db to stop the bleeding
- In 2.x (your case), try these environment variables:
```
# Slow the “recent deployments” loop to reduce thrash
PREFECT_API_SERVICES_SCHEDULER_RECENT_DEPLOYMENTS_LOOP_SECONDS=30
# Reduce work per pass
PREFECT_API_SERVICES_SCHEDULER_MAX_RUNS=50Marvin
10/08/2025, 2:23 PMpg_stat_activity
or Performance Insights, run EXPLAIN (ANALYZE, BUFFERS)
on them to see if you need additional indexes. Common helpful indexes:
- flow_runs(deployment_id)
- flow_runs(end_time)
- flow_run_states(flow_run_id, timestamp)
- task_run_states(task_run_id, timestamp)
- If you share the slow query text, I can suggest the exact index.
Can logs show the SQL directly?
- Not by default. You can temporarily set PREFECT_API_DATABASE_ECHO=true
to log SQL from the API process (very verbose — use briefly and preferably off-peak).
If you’d like, post:
- One or two of the longest-running queries you see in pg_stat_activity
or Performance Insights
- Approximate row counts of flow_runs
, task_runs
I’ll help interpret and propose precise indexes and a pruning plan.Srul Pinkas
10/08/2025, 2:30 PMMarvin
10/08/2025, 2:30 PMMarvin
10/08/2025, 2:31 PMSELECT relname, n_live_tup, n_dead_tup
FROM pg_stat_user_tables
WHERE relname IN ('flow_run_states','task_run_states');
SELECT relname, pg_size_pretty(pg_total_relation_size(relid)) AS total
FROM pg_catalog.pg_statio_user_tables
WHERE relname IN ('flow_run_states','task_run_states');
-- average states per run (sanity checks)
SELECT
(SELECT COUNT(*) FROM flow_run_states) AS flow_state_rows,
(SELECT COUNT(*) FROM flow_runs) AS flow_runs,
ROUND((SELECT COUNT(*) FROM flow_run_states)::numeric / GREATEST((SELECT COUNT(*) FROM flow_runs),1), 2) AS avg_flow_states_per_run;
SELECT
(SELECT COUNT(*) FROM task_run_states) AS task_state_rows,
(SELECT COUNT(*) FROM task_runs) AS task_runs,
ROUND((SELECT COUNT(*) FROM task_run_states)::numeric / GREATEST((SELECT COUNT(*) FROM task_runs),1), 2) AS avg_task_states_per_run;
If n_dead_tup is large, you’ll get immediate wins from pruning + vacuum.
You want to delete cancelled tasks (not completed)
Here are safe, targeted cleanup options for Prefect 2.x. Always test on a small time window first and take a backup/snapshot before large deletes.
Option A: Delete cancelled task runs older than N days (cascades their states)
- This removes the entire cancelled task run rows and their state history, but keeps other runs intact.
-- Example: older than 30 days; adjust as needed
-- Helpful index if missing: CREATE INDEX IF NOT EXISTS idx_task_runs_end_time ON task_runs (end_time);
WITH doomed AS (
SELECT tr.id
FROM task_runs tr
JOIN task_run_states s ON s.id = tr.state_id
WHERE s.type IN ('CANCELLED','CANCELLING')
AND tr.end_time IS NOT NULL
AND tr.end_time < now() - interval '30 days'
LIMIT 10000
)
DELETE FROM task_runs tr
WHERE tr.id IN (SELECT id FROM doomed);
-- Repeat the DELETE until it affects 0 rows to process in batches.
Option B: Keep runs but prune old state history (big win for table size)
- Keep the current state each run points to; delete older historical states for cancelled task runs only (or do this for all task runs if you want maximal reduction).
-- Prune task_run_states for CANCELLED runs, keeping the current state
DELETE FROM task_run_states s
USING task_runs r, task_run_states cur
WHERE s.task_run_id = r.id
AND cur.id = r.state_id
AND cur.type IN ('CANCELLED','CANCELLING')
AND s.id <> r.state_id
AND s.timestamp < now() - interval '30 days'
LIMIT 50000; -- run in batches
- You can run a similar prune for flow_run_states if desired:
DELETE FROM flow_run_states s
USING flow_runs r, flow_run_states cur
WHERE s.flow_run_id = r.id
AND cur.id = r.state_id
AND cur.type IN ('CANCELLED','CANCELLING')
AND s.id <> r.state_id
AND s.timestamp < now() - interval '30 days'
LIMIT 50000;
After cleanup, reclaim space
- Update stats so planners pick the right indexes:
VACUUM (ANALYZE) task_run_states;
VACUUM (ANALYZE) flow_run_states;
VACUUM (ANALYZE) task_runs;
VACUUM (ANALYZE) flow_runs;
- To truly shrink on-disk size, consider:Marvin
10/08/2025, 2:31 PM?application_name=prefect-api
to PREFECT_API_DATABASE_CONNECTION_URL
- If you still see no active queries at the time of warnings, the bottleneck may be intermittent (autovacuum, IO contention) or the scheduler loop is delayed by something else. Enabling RDS Performance Insights or pg_stat_statements for a day will typically reveal the heavy statements.
Two more tips that usually help
- Narrow the scheduler horizon and reduce per-loop work (2.x env vars):
PREFECT_API_SERVICES_SCHEDULER_RECENT_DEPLOYMENTS_LOOP_SECONDS=30
PREFECT_API_SERVICES_SCHEDULER_MAX_RUNS=50
PREFECT_API_SERVICES_SCHEDULER_DEPLOYMENT_BATCH_SIZE=50
PREFECT_API_SERVICES_SCHEDULER_INSERT_BATCH_SIZE=250
PREFECT_API_SERVICES_SCHEDULER_MIN_SCHEDULED_TIME=PT2H
PREFECT_API_SERVICES_SCHEDULER_MAX_SCHEDULED_TIME=P7D
PREFECT_API_DATABASE_TIMEOUT=30
- Ensure autovacuum keeps up. If n_dead_tup is high for states tables, consider increasing autovacuum aggressiveness for those tables or scheduling periodic manual VACUUM.
If you can share:
- Whether “9M” for flow_runs is rows or 9 MB
- The avg states per run from the queries above
- A sample slow statement from Performance Insights/pg_stat_statements
I can recommend exact indexes and a tailored pruning cadence (and convert the deletes into a scheduled job).Srul Pinkas
10/08/2025, 2:51 PMtask_run_states
table is called task_run_state
). but i fixed it and did count instead of delete (join and not using) - about 40% of the records will be deleted (great!), it seems
answering your questions: 9M means 9MB, but also around that in rows-count
dead-tuple is super small (2 i think)
average flow-states per run: 11307.28
average task-states per run: 7601.34Marvin
10/08/2025, 2:51 PMMarvin
10/08/2025, 2:52 PMr.state_id
) when the run is terminal (COMPLETED/FAILED/CANCELLED/CRASHED). Delete all other historical states for that run.
- Do it in batches to avoid long locks. Run repeatedly until 0 rows deleted.
Flow run states (canonical names; change to your singular names if needed)
-- Optional: add indexes to speed joins if missing
-- CREATE INDEX IF NOT EXISTS idx_flow_run_states_flow_run_id ON flow_run_states (flow_run_id);
-- CREATE INDEX IF NOT EXISTS idx_flow_runs_state_id ON flow_runs (state_id);
WITH doomed AS (
SELECT s.id
FROM flow_run_states s
JOIN flow_runs r ON r.id = s.flow_run_id
JOIN flow_run_states cur ON cur.id = r.state_id
WHERE r.state_id IS NOT NULL
AND cur.type IN ('COMPLETED','FAILED','CANCELLED','CRASHED')
AND s.id <> r.state_id
LIMIT 50000
)
DELETE FROM flow_run_states
WHERE id IN (SELECT id FROM doomed);
Task run states
-- Optional indexes
-- CREATE INDEX IF NOT EXISTS idx_task_run_states_task_run_id ON task_run_states (task_run_id);
-- CREATE INDEX IF NOT EXISTS idx_task_runs_state_id ON task_runs (state_id);
WITH doomed AS (
SELECT s.id
FROM task_run_states s
JOIN task_runs r ON r.id = s.task_run_id
JOIN task_run_states cur ON cur.id = r.state_id
WHERE r.state_id IS NOT NULL
AND cur.type IN ('COMPLETED','FAILED','CANCELLED','CRASHED')
AND s.id <> r.state_id
LIMIT 50000
)
DELETE FROM task_run_states
WHERE id IN (SELECT id FROM doomed);
Notes
- Replace `flow_run_states`/`task_run_states` with your actual table names (`flow_run_state`/`task_run_state`) if that’s what your schema uses.
- You can add an age filter if you want to be extra safe:
- AND s.timestamp < now() - interval '7 days'
- After each batch, re-run until 0 rows deleted.
Follow-up maintenance
VACUUM (ANALYZE) flow_run_states;
VACUUM (ANALYZE) task_run_states;
-- For significant space reclaim (requires locks/downtime):
-- VACUUM FULL flow_run_states;
-- VACUUM FULL task_run_states;
If downtime is hard, consider pg_repack for online compaction.
Quick sanity checks before/after
-- Find the worst offenders (runs with the most states)
SELECT flow_run_id, COUNT(*) AS n
FROM flow_run_states
GROUP BY flow_run_id
ORDER BY n DESC
LIMIT 20;
SELECT task_run_id, COUNT(*) AS n
FROM task_run_states
GROUP BY task_run_id
ORDER BY n DESC
LIMIT 20;
Why did states balloon?
Averages in the thousands suggest something is repeatedly flipping states. Common causes:
- High retry counts with short delays or tasks that keep re-entering RUNNING/SCHEDULED
- Processes repeatedly re-attaching to the same runs after restarts (workers crashing/restarting)
- Custom code or triggers that set states frequently
- Services like “late runs” thrashing on problematic runs
If you can share a couple of the top offending run IDs and their state type distribution, we can diagnose:
-- Distribution of state types for a specific run
SELECT type, COUNT(*)
FROM flow_run_states
WHERE flow_run_id = '<uuid>'
GROUP BY type
ORDER BY COUNT(*) DESC;
To prevent recurrence
- Keep a periodic prune job (weekly/daily) that keeps only the current state for terminal runs (queries above without LIMIT).
- Review retry policies and ensure they’re not excessively aggressive.
- Check for worker restarts/health that could cause repeated transitions.Marvin
10/08/2025, 2:52 PMPREFECT_API_SERVICES_SCHEDULER_RECENT_DEPLOYMENTS_LOOP_SECONDS=30
PREFECT_API_SERVICES_SCHEDULER_MAX_RUNS=50
PREFECT_API_SERVICES_SCHEDULER_DEPLOYMENT_BATCH_SIZE=50
PREFECT_API_SERVICES_SCHEDULER_INSERT_BATCH_SIZE=250
PREFECT_API_SERVICES_SCHEDULER_MIN_SCHEDULED_TIME=PT2H
PREFECT_API_SERVICES_SCHEDULER_MAX_SCHEDULED_TIME=P7D
PREFECT_API_DATABASE_TIMEOUT=30
If you want, paste one example run’s state-type breakdown and your retry config; I’ll help pinpoint the pattern and suggest guardrails so the states table doesn’t balloon again.Srul Pinkas
10/08/2025, 2:57 PMdd01ef7c-7ba6-4bce-9fb1-c1612bfe5ff5 655003
d8c7c0cb-2909-4eb0-b3f1-fcbfb6a71079 652791
1c57c018-8154-4f5b-b0d5-c53e325207ff 455405
2fff65a2-647f-425e-910a-386b98648986 422391
4e9a8f4b-53e9-4b0c-890d-7d71a5df079b 422391
bbca4c50-81f9-4b8b-bc74-e3f7c9042335 422390
breakdown of the first one:
CANCELLING 655004
SCHEDULED 1
PENDING 1
RUNNING 1
CRASHED 1
results for the second one is the same.. perhaps db-load (which we are about to fix) is what's causing the cancelling state-switches?Marvin
10/08/2025, 2:57 PMMarvin
10/08/2025, 2:59 PMprefect flow-run cancel dd01ef7c-7ba6-4bce-9fb1-c1612bfe5ff5
prefect flow-run cancel d8c7c0cb-2909-4eb0-b3f1-fcbfb6a71079
...
- Then confirm the current state:
SELECT r.id, s.type, s.timestamp
FROM flow_runs r
JOIN flow_run_state s ON s.id = r.state_id
WHERE r.id IN ('dd01ef7c-7ba6-4bce-9fb1-c1612bfe5ff5','d8c7c0cb-2909-4eb0-b3f1-fcbfb6a71079');
2) Prune historical states, keeping only the final state
- Use your singular table names. Run in batches until no more rows are deleted. This keeps only the final state for terminal runs.
-- Flow runs: keep only the current terminal state
WITH doomed AS (
SELECT s.id
FROM flow_run_state s
JOIN flow_runs r ON r.id = s.flow_run_id
JOIN flow_run_state cur ON cur.id = r.state_id
WHERE r.state_id IS NOT NULL
AND cur.type IN ('COMPLETED','FAILED','CANCELLED','CRASHED')
AND s.id <> r.state_id
LIMIT 50000
)
DELETE FROM flow_run_state
WHERE id IN (SELECT id FROM doomed);
-- Task runs: same pattern
WITH doomed AS (
SELECT s.id
FROM task_run_state s
JOIN task_runs r ON r.id = s.task_run_id
JOIN task_run_state cur ON cur.id = r.state_id
WHERE r.state_id IS NOT NULL
AND cur.type IN ('COMPLETED','FAILED','CANCELLED','CRASHED')
AND s.id <> r.state_id
LIMIT 50000
)
DELETE FROM task_run_state
WHERE id IN (SELECT id FROM doomed);
- Optional safety: also add AND s.timestamp < now() - interval '7 days'
if you want to keep the most recent history.
- Maintenance after pruning:
VACUUM (ANALYZE) flow_run_state;
VACUUM (ANALYZE) task_run_state;
-- For real space reclaim (locks/downtime): VACUUM FULL flow_run_state; VACUUM FULL task_run_state;
3) Identify who/what is re-asserting CANCELLING
- Look for repeated cancellation requests in the state messages/details:
SELECT message, COUNT(*)
FROM flow_run_state
WHERE flow_run_id = 'dd01ef7c-7ba6-4bce-9fb1-c1612bfe5ff5'
AND type = 'CANCELLING'
GROUP BY message
ORDER BY COUNT(*) DESC
LIMIT 10;
SELECT state_details, COUNT(*)
FROM flow_run_state
WHERE flow_run_id = 'dd01ef7c-7ba6-4bce-9fb1-c1612bfe5ff5'
AND type = 'CANCELLING'
GROUP BY state_details
ORDER BY COUNT(*) DESC
LIMIT 10;
- Common culprits:
- An external script repeatedly calling the cancel API for the same run
- The cancellation cleanup service re-touching runs that never finalize because workers are offline or failing to report CANCELLED
- Worker restarts or crashes causing repeated “cancel requested” handling
- Check worker logs for these run IDs and the server logs around when CANCELLING spikes.
4) Temporarily reduce/disable the cancellation churn while you clean
- If available in your 2.20 build, you can slow or temporarily disable cancellation cleanup to stop the flood while you prune. Look for an env like:
PREFECT_API_SERVICES_CANCELLATION_CLEANUP_ENABLED=false
- If that setting isn’t present in your 2.20 image, slow the scheduler loops and raise the DB timeout while you clean:
PREFECT_API_SERVICES_SCHEDULER_RECENT_DEPLOYMENTS_LOOP_SECONDS=30
PREFECT_API_SERVICES_SCHEDULER_MAX_RUNS=50
PREFECT_API_SERVICES_SCHEDULER_DEPLOYMENT_BATCH_SIZE=50
PREFECT_API_SERVICES_SCHEDULER_INSERT_BATCH_SIZE=250
PREFECT_API_SERVICES_SCHEDULER_MIN_SCHEDULED_TIME=PT2H
PREFECT_API_SERVICES_SCHEDULER_MAX_SCHEDULED_TIME=P7D
PREFECT_API_DATABASE_TIMEOUT=30
5) Make it observable
- Tag Prefect DB connections so you can spot them:
- Append ?application_name=prefect-api
to PREFECT_API_DATABASE_CONNECTION_URL
Marvin
10/08/2025, 2:59 PMPREFECT_API_DATABASE_ECHO=true
Use sparingly (very verbose).
Your numbers confirm the plan
- Avg 11k flow states/run and 7.6k task states/run = extreme churn.
- Dead tuples near zero → not (just) bloat; these are live rows. Pruning is the right fix.
- Once you force-finalize the worst runs, pruning will cut the states table dramatically. Then VACUUM/pg_repack to reclaim space.
If you share:
- The message/state_details sample from the CANCELLING rows
- Whether those runs’ current state is CANCELLING or CANCELLED after prefect flow-run cancel
I can help pinpoint the source and suggest a guardrail so this doesn’t recur.Srul Pinkas
10/08/2025, 3:01 PMThe parent flow run was cancelled.
Marvin
10/08/2025, 3:01 PMMarvin
10/08/2025, 3:02 PMprefect flow-run cancel dd01ef7c-7ba6-4bce-9fb1-c1612bfe5ff5
prefect flow-run cancel d8c7c0cb-2909-4eb0-b3f1-fcbfb6a71079
prefect flow-run cancel 1c57c018-8154-4f5b-b0d5-c53e325207ff
- Verify:
SELECT r.id, s.type, s.timestamp
FROM flow_runs r
JOIN flow_run_state s ON s.id = r.state_id
WHERE r.id IN ('dd01ef7c-7ba6-4bce-9fb1-c1612bfe5ff5','d8c7c0cb-2909-4eb0-b3f1-fcbfb6a71079','1c57c018-8154-4f5b-b0d5-c53e325207ff');
2) Optional: slow cancellation enforcement temporarily
- If available in your 2.20 image, you can slow or pause cancellation cleanup while you clean to stop re-assertions. If not present, at least raise PREFECT_API_DATABASE_TIMEOUT
and reduce scheduler work temporarily as shared earlier to reduce the rate of reassertion.
- Ref: Prefect 2 settings
Prune state history (Option B) — tuned for your singular table names
A) Terminal runs: keep only the current terminal state; delete all older states
WITH doomed AS (
SELECT s.id
FROM flow_run_state s
JOIN flow_runs r ON r.id = s.flow_run_id
JOIN flow_run_state cur ON cur.id = r.state_id
WHERE r.state_id IS NOT NULL
AND cur.type IN ('COMPLETED','FAILED','CANCELLED','CRASHED')
AND s.id <> r.state_id
LIMIT 50000
)
DELETE FROM flow_run_state
WHERE id IN (SELECT id FROM doomed);
WITH doomed AS (
SELECT s.id
FROM task_run_state s
JOIN task_runs r ON r.id = s.task_run_id
JOIN task_run_state cur ON cur.id = r.state_id
WHERE r.state_id IS NOT NULL
AND cur.type IN ('COMPLETED','FAILED','CANCELLED','CRASHED')
AND s.id <> r.state_id
LIMIT 50000
)
DELETE FROM task_run_state
WHERE id IN (SELECT id FROM doomed);
B) Runs still stuck in CANCELLING: keep only the latest CANCELLING state
- First, ensure flow_runs.state_id points to the latest CANCELLING state for consistency:
WITH latest AS (
SELECT DISTINCT ON (s.flow_run_id) s.id, s.flow_run_id, s.timestamp
FROM flow_run_state s
WHERE s.type = 'CANCELLING'
ORDER BY s.flow_run_id, s.timestamp DESC
)
UPDATE flow_runs r
SET state_id = latest.id
FROM latest
JOIN flow_run_state cur ON cur.id = r.state_id
WHERE r.id = latest.flow_run_id
AND cur.type = 'CANCELLING'
AND r.state_id <> latest.id;
- Then delete older CANCELLING rows:
WITH latest AS (
SELECT flow_run_id, MAX(timestamp) AS ts
FROM flow_run_state
WHERE type = 'CANCELLING'
GROUP BY flow_run_id
)
DELETE FROM flow_run_state s
USING latest l
JOIN flow_run_state cur ON cur.id = (SELECT state_id FROM flow_runs WHERE id = l.flow_run_id)
WHERE s.type = 'CANCELLING'
AND s.flow_run_id = l.flow_run_id
AND cur.type = 'CANCELLING'
AND s.timestamp < l.ts
LIMIT 50000;
- Run these in batches until 0 rows deleted. Then:
VACUUM (ANALYZE) flow_run_state;
VACUUM (ANALYZE) task_run_state;
-- For disk reclaim (requires locks/downtime): VACUUM FULL flow_run_state; VACUUM FULL task_run_state;
Find the parent and the cascade pattern
- For one offender, identify its parent and siblings — verify the parent is CANCELLED and children are the ones churning:
```
-- Find the parent flow run of a child subflow
SELECT parent.id AS parent_flow_run_id, child.id AS child_flow_run_id
FROM flow_runs childMarvin
10/08/2025, 3:02 PM- If you see many children in CANCELLING, finalize them (CLI cancel) and then prune.
Prevent recurrence
- Ensure workers/infrastructure actually terminate runs on cancel. If you’re using Docker/K8s/Dask/Ray, verify your cancellation behavior kills child processes; otherwise the server will keep re-requesting cancellation.
- Add run-level timeouts and reasonable retry policies to avoid long-running stuck runs.
- Keep a periodic prune job (daily/weekly) that:
- For terminal runs: keeps only current state
- For runs stuck in CANCELLING for > N minutes: keeps only the latest CANCELLING state or proactively finalizes them to CANCELLED
- While you clean up, reduce scheduler pressure and bump DB timeout:
PREFECT_API_SERVICES_SCHEDULER_RECENT_DEPLOYMENTS_LOOP_SECONDS=30
PREFECT_API_SERVICES_SCHEDULER_MAX_RUNS=50
PREFECT_API_SERVICES_SCHEDULER_DEPLOYMENT_BATCH_SIZE=50
PREFECT_API_SERVICES_SCHEDULER_INSERT_BATCH_SIZE=250
PREFECT_API_SERVICES_SCHEDULER_MIN_SCHEDULED_TIME=PT2H
PREFECT_API_SERVICES_SCHEDULER_MAX_SCHEDULED_TIME=P7D
PREFECT_API_DATABASE_TIMEOUT=30
```
Refs
- Settings: Prefect 2 settings
- Related scheduler heaviness report: RRule schedules causing scheduler CPU and long loops
If you can share whether those children finalize to CANCELLED after the CLI cancel, we can confirm the loop is broken and then aggressively prune the historical CANCELLING rows.Srul Pinkas
10/08/2025, 3:06 PMprefect flow-run cancel
but it's still only SCHEDULED for cancellation - and status in UI is still Cancelling
, so this is not setting the final state for real as you mentioned.Marvin
10/08/2025, 3:06 PMMarvin
10/08/2025, 3:07 PMprefect flow-run cancel
only sets the run to CANCELLING. Normally, a worker or the server’s cleanup service will finalize it to CANCELLED. Since yours are stuck, the fastest way is to force the final state.
Two ways to force-finalize stuck runs to CANCELLED
A) Use the REST API with force=true
- Adjust for your API URL shape. If your PREFECT_API_URL
already ends with /api
, use it directly; otherwise add /api
. If you use auth, add the Authorization header.
curl -X POST "$PREFECT_API_URL/flow_runs/dd01ef7c-7ba6-4bce-9fb1-c1612bfe5ff5/set_state?force=true" \
-H "Content-Type: application/json" \
-d '{"state": {"type":"CANCELLED","name":"Cancelled","message":"Forced cancel"}}'
- Repeat for each offender. Then verify:
SELECT r.id, s.type, s.timestamp
FROM flow_runs r
JOIN flow_run_state s ON s.id = r.state_id
WHERE r.id IN ('dd01ef7c-7ba6-4bce-9fb1-c1612bfe5ff5','d8c7c0cb-2909-4eb0-b3f1-fcbfb6a71079');
B) Use a tiny Python script (Prefect 2.x)
import asyncio
from prefect.client.orchestration import PrefectClient
from prefect.states import Cancelled
RUN_IDS = [
"dd01ef7c-7ba6-4bce-9fb1-c1612bfe5ff5",
"d8c7c0cb-2909-4eb0-b3f1-fcbfb6a71079",
# ...
]
async def main():
async with PrefectClient() as client:
for rid in RUN_IDS:
state = Cancelled(name="Forced cancel")
await client.set_flow_run_state(rid, state=state, force=True)
print("Forced CANCELLED:", rid)
asyncio.run(main())
- This bypasses orchestration guardrails and sets the final state.
Then prune the storm of CANCELLING states (Option B)
- Keep only the current state for terminal runs and delete older history. Since your schema uses singular names, use these:
WITH doomed AS (
SELECT s.id
FROM flow_run_state s
JOIN flow_runs r ON r.id = s.flow_run_id
JOIN flow_run_state cur ON cur.id = r.state_id
WHERE r.state_id IS NOT NULL
AND cur.type IN ('COMPLETED','FAILED','CANCELLED','CRASHED')
AND s.id <> r.state_id
LIMIT 50000
)
DELETE FROM flow_run_state
WHERE id IN (SELECT id FROM doomed);
WITH doomed AS (
SELECT s.id
FROM task_run_state s
JOIN task_runs r ON r.id = s.task_run_id
JOIN task_run_state cur ON cur.id = r.state_id
WHERE r.state_id IS NOT NULL
AND cur.type IN ('COMPLETED','FAILED','CANCELLED','CRASHED')
AND s.id <> r.state_id
LIMIT 50000
)
DELETE FROM task_run_state
WHERE id IN (SELECT id FROM doomed);
- Run in batches until 0 rows; then:
VACUUM (ANALYZE) flow_run_state;
VACUUM (ANALYZE) task_run_state;
-- For real disk reclaim (locks/downtime):
-- VACUUM FULL flow_run_state; VACUUM FULL task_run_state;
If a run is still in CANCELLING after the forced API call
- Confirm the HTTP response was 200 and that you used force=true
.
- Make sure you hit the correct base URL (some setups have PREFECT_API_URL
already including /api
).
- If it still won’t stick, we can do a surgical DB remediation: insert a new CANCELLED state row and point flow_runs.state_id
to it. I can give you that SQL if needed; the API method is preferred.
Reduce the chance of recurrence
- These children are getting repeatedly set to CANCELLING because “The parent flow run was cancelled.” Once you force them to CANCELLED, the loop should stop.
- Ensure cancellations actually terminate child work on your infra (Docker/K8s/etc.); otherwise the server keeps reasserting CANCELLING.
- Consider a periodic job that:
- For terminal runs, keeps only the current state (queries above without LIMIT)
- For runs stuck in CANCELLING for > N minutes, forcibly sets CANCELLED via API
If you want, share the exact API URL you’re using and the response you get from the force call; I can sanity-check the endpoint and payload.Srul Pinkas
10/08/2025, 3:12 PMTraceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/Users/srul.pinkas/.pyenv/versions/3.9.8/lib/python3.9/asyncio/runners.py", line 44, in run
return loop.run_until_complete(main)
File "/Users/srul.pinkas/.pyenv/versions/3.9.8/lib/python3.9/asyncio/base_events.py", line 642, in run_until_complete
return future.result()
File "<stdin>", line 2, in main
TypeError: __init__() missing 1 required positional argument: 'api'
Marvin
10/08/2025, 3:12 PMMarvin
10/08/2025, 3:13 PMPrefectClient
requires an api
argument. The easiest fix is to use get_client()
which reads PREFECT_API_URL
from your env.
Try one of these two options.
Option A: use get_client() (recommended)
import asyncio
from prefect.client.orchestration import get_client
from prefect.states import Cancelled
RUN_IDS = [
"dd01ef7c-7ba6-4bce-9fb1-c1612bfe5ff5",
"d8c7c0cb-2909-4eb0-b3f1-fcbfb6a71079",
# ...
]
async def main():
async with get_client() as client: # picks up PREFECT_API_URL
for rid in RUN_IDS:
state = Cancelled(name="Forced cancel")
resp = await client.set_flow_run_state(rid, state=state, force=True)
print("Forced CANCELLED:", rid, getattr(resp, "status", resp))
asyncio.run(main())
Option B: pass the API URL explicitly
import os, asyncio
from prefect.client.orchestration import PrefectClient
from prefect.states import Cancelled
API = os.environ["PREFECT_API_URL"] # e.g. http://<host>:4200/api
RUN_IDS = ["dd01ef7c-7ba6-4bce-9fb1-c1612bfe5ff5", "d8c7c0cb-2909-4eb0-b3f1-fcbfb6a71079"]
async def main():
async with PrefectClient(api=API) as client:
for rid in RUN_IDS:
state = Cancelled(name="Forced cancel")
resp = await client.set_flow_run_state(rid, state=state, force=True)
print("Forced CANCELLED:", rid, getattr(resp, "status", resp))
asyncio.run(main())
If you prefer cURL, you can force via the REST API as well (adjust base URL and auth as needed):
curl -X POST "$PREFECT_API_URL/flow_runs/dd01ef7c-7ba6-4bce-9fb1-c1612bfe5ff5/set_state?force=true" \
-H "Content-Type: application/json" \
-d '{"state": {"type":"CANCELLED","name":"Cancelled","message":"Forced cancel"}}'
If the forced set still doesn’t stick or returns a rejection, tell me the response you get and we’ll either adjust the request or do a surgical DB fix:
- Insert a CANCELLED state row, then point the run to it:
-- Replace with your singular table names if needed
WITH new_state AS (
INSERT INTO flow_run_state (flow_run_id, type, name, message, timestamp)
VALUES ('dd01ef7c-7ba6-4bce-9fb1-c1612bfe5ff5', 'CANCELLED', 'Cancelled', 'Forced cancel', now())
RETURNING id
)
UPDATE flow_runs r
SET state_id = new_state.id
FROM new_state
WHERE r.id = 'dd01ef7c-7ba6-4bce-9fb1-c1612bfe5ff5';
Then run your pruning queries to drop the hundreds of thousands of old CANCELLING rows and VACUUM.
Once you confirm the forced CANCELLED worked, I’d:
- Prune historical states (keep only the current state for terminal runs)
- Consider a small periodic job to do that pruning going forward
- Double-check your infra cancellation actually terminates child work, so the server doesn’t keep reasserting CANCELLING on children of a cancelled parent
If anything in the snippets above throws or returns a REJECT/WAI status, paste it here and I’ll adjust the approach.Srul Pinkas
10/08/2025, 3:18 PMForced Cancel
(note that this is not CANCELLED. should we changed anything in our scripts /sqls?).
I ran it for all >200k top-offenders (30 runs). i also have about 100 runs with 10k n (states-count). then the rest are single-digits. is this also super-weird (note that i have sometimes 50 sub-flows in my runs. and also sometimes i want about a minute for a machine to be up - so that's a few states i always get) and i should fix them? or only the 200k and up which i did?.Marvin
10/08/2025, 3:18 PMMarvin
10/08/2025, 3:19 PMWHERE type IN ('COMPLETED','FAILED','CANCELLED','CRASHED')
Should you fix the ~100 runs with ~10k states?
- 10k states/run is still very high. It’s worth investigating, but you don’t have to stop everything for them right now.
- First, prune history so they don’t waste space:
- The “keep only the current state for terminal runs” query will clean these too if they’re terminal.
- For non-terminal runs stuck in CANCELLING, keep only the latest CANCELLING state.
Here are the cleanup queries using your singular table names.
Prune terminal runs (keep only current terminal state)
WITH doomed AS (
SELECT s.id
FROM flow_run_state s
JOIN flow_runs r ON r.id = s.flow_run_id
JOIN flow_run_state cur ON cur.id = r.state_id
WHERE r.state_id IS NOT NULL
AND cur.type IN ('COMPLETED','FAILED','CANCELLED','CRASHED')
AND s.id <> r.state_id
LIMIT 50000
)
DELETE FROM flow_run_state
WHERE id IN (SELECT id FROM doomed);
WITH doomed AS (
SELECT s.id
FROM task_run_state s
JOIN task_runs r ON r.id = s.task_run_id
JOIN task_run_state cur ON cur.id = r.state_id
WHERE r.state_id IS NOT NULL
AND cur.type IN ('COMPLETED','FAILED','CANCELLED','CRASHED')
AND s.id <> r.state_id
LIMIT 50000
)
DELETE FROM task_run_state
WHERE id IN (SELECT id FROM doomed);
Run repeatedly until 0 rows deleted; then:
VACUUM (ANALYZE) flow_run_state;
VACUUM (ANALYZE) task_run_state;
For runs still in CANCELLING: keep only the latest CANCELLING state
-- Ensure flow_runs.state_id points to the latest CANCELLING
WITH latest AS (
SELECT DISTINCT ON (s.flow_run_id) s.id, s.flow_run_id, s.timestamp
FROM flow_run_state s
WHERE s.type = 'CANCELLING'
ORDER BY s.flow_run_id, s.timestamp DESC
)
UPDATE flow_runs r
SET state_id = latest.id
FROM latest
JOIN flow_run_state cur ON cur.id = r.state_id
WHERE r.id = latest.flow_run_id
AND cur.type = 'CANCELLING'
AND r.state_id <> latest.id;
-- Delete older CANCELLING rows
WITH latest AS (
SELECT flow_run_id, MAX(timestamp) AS ts
FROM flow_run_state
WHERE type = 'CANCELLING'
GROUP BY flow_run_id
)
DELETE FROM flow_run_state s
USING latest l
JOIN flow_run_state cur ON cur.id = (SELECT state_id FROM flow_runs WHERE id = l.flow_run_id)
WHERE s.type = 'CANCELLING'
AND s.flow_run_id = l.flow_run_id
AND cur.type = 'CANCELLING'
AND s.timestamp < l.ts
LIMIT 50000;
Diagnose why those 10k runs ballooned
- Check the state-type distribution on a few of them. If it’s mostly CANCELLING/RETRYING/RUNNING flips, there’s a behavioral loop to address.
SELECT type, COUNT(*)
FROM flow_run_state
WHERE flow_run_id = '<run_id_with_10k_states>'
GROUP BY type
ORDER BY COUNT(*) DESC;
- Common causes:
- Parent cancellation cascading to many children that never finalize, repeatedly reasserting CANCELLING
- Very aggressive retries or short retry delays causing rapid churn
- Infrastructure not actually terminating work on cancel (server keeps re-requesting cancel)
- Automation repeatedly calling cancel on the same runs
Prevention
- Keep the pruning job as a periodic maintenance task.
- Ensure cancellations on your infra truly terminate processes for subflows/tasks.
- Tame retry policies and add sensible timeouts to flows/tasks to avoid long “stuck” runs.
Once your cleanup is done and things stabilize, you can dial back any temporary scheduler/DB tweaks you set earlier. If you want, share one 10k run’s state distribution and I’ll help pinpoint the cause and suggest exact guardrails.Srul Pinkas
10/08/2025, 3:44 PMMarvin
10/08/2025, 3:44 PMMarvin
10/08/2025, 3:46 PM-- For flow states
CREATE INDEX IF NOT EXISTS idx_flow_run_state_flow_run_id ON flow_run_state (flow_run_id);
CREATE INDEX IF NOT EXISTS idx_flow_runs_state_id ON flow_runs (state_id);
-- For task states
CREATE INDEX IF NOT EXISTS idx_task_run_state_task_run_id ON task_run_state (task_run_id);
CREATE INDEX IF NOT EXISTS idx_task_runs_state_id ON task_runs (state_id);
ANALYZE flow_run_state; ANALYZE flow_runs;
ANALYZE task_run_state; ANALYZE task_runs;
Fast chunked delete pattern (no CTE, use ctid and commit each batch)
- This pattern avoids materializing a huge CTE and lets you run many small commits. Run these repeatedly until 0 rows are deleted each time. Adjust chunk size to what your DB tolerates (start 20k–50k).
Flow run states (keep only current state for terminal runs)
DELETE FROM flow_run_state s
USING (
SELECT s.ctid
FROM flow_run_state s
JOIN flow_runs r ON r.id = s.flow_run_id
JOIN flow_run_state cur ON cur.id = r.state_id
WHERE r.state_id IS NOT NULL
AND cur.type IN ('COMPLETED','FAILED','CANCELLED','CRASHED')
AND s.id <> r.state_id
LIMIT 50000
) d
WHERE s.ctid = d.ctid;
Task run states
DELETE FROM task_run_state s
USING (
SELECT s.ctid
FROM task_run_state s
JOIN task_runs r ON r.id = s.task_run_id
JOIN task_run_state cur ON cur.id = r.state_id
WHERE r.state_id IS NOT NULL
AND cur.type IN ('COMPLETED','FAILED','CANCELLED','CRASHED')
AND s.id <> r.state_id
LIMIT 50000
) d
WHERE s.ctid = d.ctid;
How to loop with commits
- Easiest is a tiny client script that repeats the delete until it returns 0 rows, committing each batch.
Example with psycopg (commit per batch)
import os, time, psycopg2
dsn = os.environ["PREFECT_DB_DSN"] # e.g. <postgresql://user:pass@host:5432/db>
chunk = 50000
delete_flow_sql = f"""
DELETE FROM flow_run_state s
USING (
SELECT s.ctid
FROM flow_run_state s
JOIN flow_runs r ON r.id = s.flow_run_id
JOIN flow_run_state cur ON cur.id = r.state_id
WHERE r.state_id IS NOT NULL
AND cur.type IN ('COMPLETED','FAILED','CANCELLED','CRASHED')
AND s.id <> r.state_id
LIMIT {chunk}
) d
WHERE s.ctid = d.ctid;
"""
delete_task_sql = f"""
DELETE FROM task_run_state s
USING (
SELECT s.ctid
FROM task_run_state s
JOIN task_runs r ON r.id = s.task_run_id
JOIN task_run_state cur ON cur.id = r.state_id
WHERE r.state_id IS NOT NULL
AND cur.type IN ('COMPLETED','FAILED','CANCELLED','CRASHED')
AND s.id <> r.state_id
LIMIT {chunk}
) d
WHERE s.ctid = d.ctid;
"""
with psycopg2.connect(dsn) as conn:
conn.autocommit = False
with conn.cursor() as cur:
while True:
cur.execute(delete_flow_sql)
deleted = cur.rowcount
conn.commit()
print("flow_run_state deleted:", deleted)
if deleted == 0:
break
time.sleep(0.2)
while True:
cur.execute(delete_task_sql)
deleted = cur.rowcount
conn.commit()
print("task_run_state deleted:", deleted)
if deleted == 0:
break
time.sleep(0.2)
Alternative: temp table of terminal runs for a faster join
- If you have many non-terminal runs, precomputing terminal runs can make deletes cheaper:
```
CREATE TEMP TABLE term_flow_runs AS
SELECT r.id AS flow_run_id, r.state_id AS keep_state_id
FROM flow_runs r
JOIN flow_run_state cur ON cur.id = r.state_id
WHERE cur.type IN ('COMPLETED','FAILED','CANCELLED','CRASHED');
CREATE INDEX ON term_flow_runs (flow_run_id);
CREATE INDEX ON term_flow_runs (keep_state_id);
ANALYZE term_flow_runs;
-- Then chunked delete becomes:
DELETE FROM flow_run_state s
USING (Marvin
10/08/2025, 3:46 PM- Do the same for task runs if needed.
Session tweaks while deleting
- Run during off-peak if you can.
- Consider temporarily raising statement_timeout for your session or disabling it:
SET LOCAL statement_timeout = '0'; -- no timeout for the maintenance session
SET LOCAL lock_timeout = '2s'; -- don’t wait forever on conflicting locks
SET LOCAL application_name = 'prefect-prune';
After the big delete
VACUUM (ANALYZE) flow_run_state;
VACUUM (ANALYZE) task_run_state;
-- To reclaim disk (requires locks/downtime): VACUUM FULL flow_run_state; VACUUM FULL task_run_state;
-- Or use pg_repack for online compaction if available
```
How long will ~4M rows take?
- If you’re getting ~50k/min with small locks/commits, you’re looking at around 80 minutes for 4M — that’s reasonable. Larger batches can backfire due to memory and longer locks. The chunk+commit approach typically gives the best wall-clock with the least impact on other workload.
If you hit any lock waits or see the delete still scanning too much, share EXPLAIN for the chunked delete and we’ll tune indexes further.Srul Pinkas
10/08/2025, 4:04 PMctid
or id
the field i should use? the python and sql snippets are differentMarvin
10/08/2025, 4:04 PMMarvin
10/08/2025, 4:05 PMCREATE INDEX IF NOT EXISTS idx_flow_run_state_flow_run_id ON flow_run_state (flow_run_id);
CREATE INDEX IF NOT EXISTS idx_flow_runs_state_id ON flow_runs (state_id);
CREATE INDEX IF NOT EXISTS idx_task_run_state_task_run_id ON task_run_state (task_run_id);
CREATE INDEX IF NOT EXISTS idx_task_runs_state_id ON task_runs (state_id);
ANALYZE flow_run_state; ANALYZE flow_runs;
ANALYZE task_run_state; ANALYZE task_runs;
Chunked delete using ctid (terminal runs: keep only current state)
DELETE FROM flow_run_state s
USING (
SELECT s.ctid
FROM flow_run_state s
JOIN flow_runs r ON r.id = s.flow_run_id
JOIN flow_run_state cur ON cur.id = r.state_id
WHERE r.state_id IS NOT NULL
AND cur.type IN ('COMPLETED','FAILED','CANCELLED','CRASHED')
AND s.id <> r.state_id
LIMIT 50000
) pick
WHERE s.ctid = pick.ctid;
DELETE FROM task_run_state s
USING (
SELECT s.ctid
FROM task_run_state s
JOIN task_runs r ON r.id = s.task_run_id
JOIN task_run_state cur ON cur.id = r.state_id
WHERE r.state_id IS NOT NULL
AND cur.type IN ('COMPLETED','FAILED','CANCELLED','CRASHED')
AND s.id <> r.state_id
LIMIT 50000
) pick
WHERE s.ctid = pick.ctid;
If you prefer the id variant (slightly simpler, may be a bit slower):
DELETE FROM flow_run_state s
USING (
SELECT s.id
FROM flow_run_state s
JOIN flow_runs r ON r.id = s.flow_run_id
JOIN flow_run_state cur ON cur.id = r.state_id
WHERE r.state_id IS NOT NULL
AND cur.type IN ('COMPLETED','FAILED','CANCELLED','CRASHED')
AND s.id <> r.state_id
LIMIT 50000
) pick
WHERE s.id = pick.id;
Pure SQL loop (runs on the server; stops when a batch deletes 0 rows)
DO $$
DECLARE
r BIGINT;
BEGIN
LOOP
DELETE FROM flow_run_state s
USING (
SELECT s.ctid
FROM flow_run_state s
JOIN flow_runs r ON r.id = s.flow_run_id
JOIN flow_run_state cur ON cur.id = r.state_id
WHERE r.state_id IS NOT NULL
AND cur.type IN ('COMPLETED','FAILED','CANCELLED','CRASHED')
AND s.id <> r.state_id
LIMIT 50000
) pick
WHERE s.ctid = pick.ctid;
GET DIAGNOSTICS r = ROW_COUNT;
RAISE NOTICE 'flow_run_state deleted %', r;
IF r = 0 THEN EXIT; END IF;
END LOOP;
END $$;
Session tips while running batches
SET LOCAL application_name = 'prefect-prune';
SET LOCAL lock_timeout = '2s';
SET LOCAL statement_timeout = '0'; -- just for this maintenance session
After you finish
VACUUM (ANALYZE) flow_run_state;
VACUUM (ANALYZE) task_run_state;
-- To reclaim disk (locks/downtime):
-- VACUUM FULL flow_run_state; VACUUM FULL task_run_state;
TL;DR: Use the ctid-based chunked delete inside a loop (or the DO $$ loop above). It’s normal that each 50k batch takes ~1 minute on large tables; the loop with commits will still finish faster overall and with less lock impact than giant single-shot deletes. Adjust the LIMIT up/down to hit your sweet spot.Srul Pinkas
10/08/2025, 7:14 PMMarvin
10/08/2025, 7:14 PMMarvin
10/08/2025, 7:15 PMSELECT relname,
pg_size_pretty(pg_relation_size(oid)) AS table_size,
pg_size_pretty(pg_indexes_size(oid)) AS index_size,
pg_size_pretty(pg_total_relation_size(oid)) AS total_size
FROM pg_class
WHERE relname IN ('flow_run_state','task_run_state');
2) If you need the physical files to shrink:
- Option A (simple, requires downtime/locks): VACUUM FULL rewrites and shrinks
VACUUM FULL flow_run_state;
VACUUM FULL task_run_state;
This takes an exclusive lock per table. Do it in a maintenance window (stop Prefect API/services and workers while it runs).
- Option B (online): pg_repack
- Rewrites tables and indexes online with minimal blocking. Requires installing/using pg_repack client and extension.
- Docs: https://reorg.github.io/pg_repack/
- Good when you can’t take locks but want disk back.
- Option C (manual rewrite): create a new table, copy rows, swap, drop old
- More operational work (constraints, indexes, FKs), but also shrinks files.
3) Don’t forget indexes
- Indexes can be a big chunk of that 9 GB. If you don’t do VACUUM FULL or pg_repack, you can at least shrink indexes:
REINDEX TABLE CONCURRENTLY flow_run_state;
REINDEX TABLE CONCURRENTLY task_run_state;
Important note for RDS costs
- Even if you shrink table files, RDS allocated storage generally cannot be decreased in-place. You’ll reclaim internal free space (future growth uses it), but your storage bill doesn’t drop unless you snapshot/restore into a smaller volume. AWS docs: https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/CHAP_Storage.html
Bottom line
- Your deletes + VACUUM were correct for performance.
- To see DBeaver’s size drop, run VACUUM FULL (with a lock/maintenance window) or use pg_repack.
- If the goal is to lower AWS storage cost, plan a snapshot/restore to a smaller volume after compaction, since RDS doesn’t shrink allocated storage in place.
If you can schedule a window, I’d:
- Stop Prefect API/services and workers
- VACUUM FULL the two states tables (and optionally REINDEX others if needed)
- Start services back up
I’m happy to help plan the exact maintenance steps for your deployment (K8s/Docker/etc.).Srul Pinkas
10/14/2025, 2:49 PMPrefect flow run notification
Flow run xxx_flow/pi-sothis entered state Completed at 2025-10-12T13:09:43.553961+00:00.
Flow ID: 5653c626-455b-466a-944b-c8e72959e2da
Flow run ID: 388f3b8d-19f4-4ae6-bfb8-546b74869476
Flow run URL: <http://localhost:4200/flow-runs/flow-run/388f3b8d-19f4-4ae6-bfb8-546b74869476>
State message: All states completed.
but the localhost-link is really annoying (not clickable of course). i think i asked you this before - is there an easy env-parameter i can change in order to get the actual domain-URL so that the links would work? is this an k8s inner-OS thing i can't control?
i'll type the second (unrelated) question in the next msg while you thinkSrul Pinkas
10/14/2025, 2:52 PMSrul Pinkas
10/15/2025, 9:19 AMMarvin
10/15/2025, 9:20 AMMarvin
10/15/2025, 9:20 AMSrul Pinkas
10/15/2025, 9:20 AMPrefect flow run notification
Flow run xxx_flow/pi-sothis entered state Completed at 2025-10-12T13:09:43.553961+00:00.
Flow ID: 5653c626-455b-466a-944b-c8e72959e2da
Flow run ID: 388f3b8d-19f4-4ae6-bfb8-546b74869476
Flow run URL: <http://localhost:4200/flow-runs/flow-run/388f3b8d-19f4-4ae6-bfb8-546b74869476>
State message: All states completed.
but the localhost-link is really annoying (not clickable of course). i think i asked you this before - is there an easy env-parameter i can change in order to get the actual domain-URL so that the links would work? is this an k8s inner-OS thing i can't control?
QUESTION 2:
we're trying to remove deployments in a job-clean-up (we create many deployments - for every branch, so we can try out runs even before merge, so we get lots of spammy deployments). the job is written as a gitlab-ci job, that interacts with prefect through Python API. the api works well locally, but for some weird way - the gitlab-ci run only gets two first deployments and that's it. so i guess it's not permissions / API mis-use / time-performance. Have you heard of such a weird behavior? anything i can do to debug this?Marvin
10/15/2025, 9:20 AMMarvin
10/15/2025, 9:23 AMPREFECT_UI_URL
to your externally reachable domain (and include any path prefix if you’re serving Prefect under a subpath). Example values:
- <https://prefect.mycompany.com>
- <https://prefect.mycompany.com/prefect>
(if you reverse proxy under /prefect)
Make sure this env var is set in the Prefect API/Server pod/container (the component that emits notifications), not just your local machine. Once set, new notifications will include correct, clickable URLs.
It’s not a K8s limitation — it’s a Prefect config defaulting to localhost when unset.
Helpful docs:
- Prefect 2 settings guide
- Self-hosting Prefect 2
Quick examples:
- Kubernetes (Deployment env):
- name: PREFECT_UI_URL
value: "<https://prefect.mycompany.com>"
- Docker Compose:
environment:
- PREFECT_UI_URL=<https://prefect.mycompany.com>
QUESTION 2 (CI job only “gets two” deployments)
I haven’t seen a deterministic “2 only” bug, but a few common culprits explain this behavior:
- Pagination not handled: Prefect’s deployments list is paginated. If your code doesn’t paginate explicitly, you’ll only get the first page (the default limit
can vary by method/environment).
- Different API target in CI: PREFECT_API_URL
might be pointing to a different server/workspace in CI than locally.
- Filtering differences: Filters (name/prefix/tags) can narrow to just a couple items without being obvious.
- Timeout/partial iteration issues.
To make the cleanup robust, explicitly paginate and/or request a large page size:
import anyio
from prefect.client.orchestration import PrefectClient
async def list_all_deployments():
async with PrefectClient() as client:
all_deps = []
page_size = 200
offset = 0
while True:
deps = await client.read_deployments(limit=page_size, offset=offset)
if not deps:
break
all_deps.extend(deps)
offset += len(deps)
return all_deps
async def delete_branch_deployments(prefix: str):
deps = await list_all_deployments()
async with PrefectClient() as client:
for d in deps:
if d.name.startswith(prefix):
await client.delete_deployment(d.id)
anyio.run(delete_branch_deployments, "your-branch-prefix-")
Debug tips:
- Print the count you receive in CI vs local to confirm pagination differences.
- Ensure PREFECT_API_URL
in CI points to the same server you expect.
- Turn on debug logs in CI to see the requests:
export PREFECT_LOGGING_LEVEL=DEBUG
export PREFECT_LOGGING_EXTRA_LOGGERS=httpx
- Hit the REST endpoint directly to verify pagination:
- POST to /api/deployments/filter
with {"limit": 200, "offset": 0}
and compare total rows returned.
- API reference: Prefect 2 REST API
If you can share:
- The snippet you use to list/delete deployments
- The expected number of deployments vs what you see in CI
- The value of PREFECT_API_URL
in CI
…I can help pinpoint whether it’s pagination, filtering, or environment targeting.