Hey guys, A question about sub-flows concurrency ...
# ask-community
s
Hey guys, A question about sub-flows concurrency basic control issue: I have the need to create a flow to run sub-flow that are a bit heavy (on memory mostly, take ~10 min to run), and i need it with a few dozens different-inputs. i'm using prefect 2.20 I tried two approaches: - simply trigger each sub-flow (through
run_deployment
in order to allocate separate resources). this creates dozens sub-flows but the parent-job trying to keep updated and manage it crashes due to connection-limit reached error (an internal
httpx
crash). I asked marvin about configuring it on prefect and didn't see any immediate solution, perhaps more than 4-5 running in parallel is too much for prefect? am i missing something? - create many sub-flows but run it through a managed concurrency-limit queue (say, 3 at each given time - once one is done another kicks in). since it's a flow and not a task i had to go through work-pools /work-queue limits configuration. the thing is that since it takes some time to get the actual machine (mostly for the first batch from cluster), during that time it seems there's an error for the sub-flow yet to be executed (-1 signal until later it succeeds). and due to that temp-failure, other sub-flows also start to run and i'm missing my 3-limit (so 6-7 are running in parallel) which is odd as it seems like a common glitch. I looked for a parameter that controls how much time to wait for the first sub-flow-submit failure to avoid this race-edge-case but the only parameter (
job_watch_timeout_seconds
) that the docs were promising about, was also described as: "Number of seconds to wait for each event emitted by a job before timing out" - so raising this to 1-2 minutes seems too important/risky. - The third fallback option i use is creating less but chubbier jobs - each running serially 10 different inputs - and running them in parallel. but that seems like a bad solution for such a platform that should take care of that.. Did i miss any simple way out of this? Thanks!
b
- simply trigger each sub-flow (through
run_deployment
in order to allocate separate resources). this creates dozens sub-flows but the parent-job trying to keep updated and manage it crashes due to connection-limit reached error (an internal
httpx
crash). I asked marvin about configuring it on prefect and didn't see any immediate solution, perhaps more than 4-5 running in parallel is too much for prefect? am i missing something?
Hi Srul! This option makes sense to me. Do you have an example of the error that the parent throws?
As for managing concurrency, have you already explored setting deployment-level concurrency?
s
Thanks for your reply @Bianca Hoch ! errors on parent-flow: we suspect maybe this is do to some karpenter running in the background and killing our pods (perhaps spot-machine related) - whtether the prefect-server itself causing the connection-errors, or sub-flow pods (where the server does not recognize the pod is dead for some reason, so no retry is initiated, just keeps waiting). still investigating. as for concurrency - we're using prefect 2.20 and not 3, where it seems the deployment-concurrency-limit is enabled, but in general it doesn't mention the issue of not recognizing machines are on-the-way before sending new tasks
Hey @Marvin in prefect 2.20, what's the default timeout for a
run_deployment
call? if that timeout is reached while running, is it really crashing or does the job keep on running and once succeeds it's all good?
m
I couldn't find specific information about the default timeout for the
run_deployment
call in Prefect 2.20. However, generally in Prefect, if a timeout is reached during a flow or task execution, the execution is typically halted. The behavior might vary depending on how the timeout is configured and handled within the flow or task logic itself. To ensure proper handling of timeouts, you might need to explicitly set and manage them in your flow or task configurations. If a task or flow is set to timeout and it does, the execution would stop unless you've implemented logic to handle such scenarios. You can explore more about timeouts and deployment configurations in the Prefect documentation: Prefect Deployments Guide. If you have specific configurations or setups, you might want to check those to confirm the behavior.
s
Hey @Marvin - in order to use a concurrency limit in a
run_deployment
job, i need to use a
work_queue_name
yaml definition that has concurrency limit? is this enough? or do i also need to make sure the
work_pool --> name
also represents a concurrent-limited entity? (prefect 2.20)
m
To use a concurrency limit with a
run_deployment
job in Prefect 2.20, you need to configure both the work queue and work pool appropriately: 1. Work Queue: You should define a
work_queue_name
in your YAML configuration that has a concurrency limit set. This allows you to control how many tasks or flows can run concurrently within that specific queue. 2. Work Pool: Additionally, you need to ensure that the
work_pool
itself has a concurrency limit configured. The work pool's concurrency limit acts as an overarching control, ensuring that the total number of concurrent runs across all queues within the pool does not exceed the specified limit. In summary, both the work queue and the work pool need to have concurrency limits if you want to manage concurrency effectively across your deployments. This setup provides fine-grained control over the execution of runs. For more detailed instructions, you might want to refer to the Prefect documentation on work pools and queues.
s
Hey @Marvin - in prefect 2.20, if i submit many
task
wrapped functions, how do i control the concurrency to not go above 5 at the same time?
m
To control task concurrency in Prefect 2.20 and ensure that no more than 5 tasks run simultaneously, you can use task concurrency limits with task tags. Here's how you can achieve this: 1. Tag Your Tasks: Assign a specific tag to the tasks you want to limit. For example, you might tag all relevant tasks with
"limited_concurrency"
. 2. Set Concurrency Limit: Use Prefect's task concurrency limits to specify the maximum number of concurrent task runs for tasks with the given tag. You can set this limit to 5. Here's a high-level example of how you might implement this:
Copy code
python
from prefect import flow, task
from prefect.tasks import task_input_hash

@task(tags=["limited_concurrency"])
def my_task():
    # Task logic here
    pass

@flow
def my_flow():
    for _ in range(10):
        my_task()

if __name__ == "__main__":
    my_flow()
To enforce the concurrency limit, you would configure it using the Prefect CLI or UI:
Copy code
bash
prefect concurrency-limit create --tag limited_concurrency --limit 5
This setup ensures that no more than 5 tasks with the
"limited_concurrency"
tag run at the same time. Further details can be found in the Prefect Tasks Documentation.
s
@Marvin there's no way to do this in code? when i define the
@task
decorator or when i do the actual submit?
m
In Prefect 2.20, you cannot directly set a concurrency limit in code using the
@task
decorator or when submitting tasks. Concurrency limits are managed via task tags and configured outside of the code, typically using the Prefect CLI or UI. Here's a concise approach to manage concurrency: 1. Tag your tasks: Use the
tags
parameter in the
@task
decorator to assign a specific tag to tasks you want to control. For example:
Copy code
python
   from prefect import task, flow

   @task(tags=["limited_concurrency"])
   def my_task():
       # Task logic here
       pass
2. Set concurrency limits using the CLI: Use the Prefect CLI to set a concurrency limit on the tag:
Copy code
bash
   prefect concurrency-limit create --tag limited_concurrency --limit 5
This approach ensures that no more than 5 tasks with the
"limited_concurrency"
tag run concurrently. Unfortunately, setting concurrency limits directly in code is not supported, and you'll need to use external configuration for this purpose.
s
hey @Marvin, in prerect 2.20, how do i get the slack-token that i put as a block through prefect-UI directly as webhook-URL? must i define a block? how do i do it?
m
To retrieve a Slack token you've stored as a block in Prefect 2.20, you can use the
SlackCredentials
block. Here's how you can load the stored Slack credentials: 1. Load the Slack Credentials Block:
Copy code
python
   from prefect_slack import SlackCredentials

   # Replace "BLOCK_NAME" with the name of your block
   slack_credentials_block = SlackCredentials.load("BLOCK_NAME")
2. Get the Slack Client: Once you've loaded the block, you can get a Slack client to interact with Slack:
Copy code
python
   client = slack_credentials_block.get_client()
3. Webhook URL: If you need to set up a webhook, you can refer to Prefect's webhook documentation for managing webhook URLs. Prefect Cloud assigns a webhook URL when you create a webhook, and it looks something like
<https://api.prefect.cloud/hooks/your-webhook-id>
. For more details, you can refer to the Slack Credentials documentation. If you need to create a webhook, you can find more information in the webhook guide. Let me know if you need further assistance!
s
Hi @Marvin - the automatic slack notifications i have set (with an incoming slack webhook) links to
localhost
instead of a link - where can i configure it to be a proper link?
m
To configure the Slack webhook notifications to use a proper link instead of
localhost
, you should look into how the notification URLs are being templated. Prefect 3.x allows you to use templated variables for dynamic content in notifications. Specifically, you can use the
{{ flow_run|ui_url }}
token to include the correct URL to view the flow run in the Prefect UI. Here are some steps you can take: 1. Check your Automation Configuration: Make sure your automation configuration is set up correctly to use the
ui_url
token. This will ensure that the correct URL is included in your notifications. 2. Use Jinja Templating: Prefect supports Jinja templating, which allows you to dynamically include details such as the flow or pool name. For example, you can use
{{ flow_run|ui_url }}
to insert the URL to the flow run in your Slack message. 3. Update Notification Settings: Ensure that your notification settings in the Prefect configuration or your deployment YAML are properly utilizing these tokens. For more details, you can refer to the Prefect documentation on webhook triggers and automation triggers. If the issue persists, make sure there are no misconfigurations in your deployment or webhook settings that might be defaulting to
localhost
. If you need further assistance with specific configurations, feel free to ask!
s
@Marvin i'm using prefect 2.20, not 3
m
In Prefect 2.x, if your Slack notifications are linking to
localhost
, it's likely due to how the URLs are being generated or configured within your setup. Here are some steps and resources to help you configure it correctly: 1. Slack Webhook Configuration: Ensure that your Slack webhook is correctly set up and that you are using the right URL for notifications. You can find more about configuring Slack webhooks in the Prefect Slack integration documentation. 2. Notification Settings: Check your notification settings in Prefect. You can configure notifications to trigger on specific flow run states and specify the Slack webhook URL. More details can be found in the Prefect notifications guide. 3. Flow Run URLs: If the flow run URLs are pointing to
localhost
, ensure that your Prefect server or agent is correctly configured to use the appropriate hostname or IP address. This might involve setting environment variables or adjusting your Prefect server configuration. 4. Use of Tokens: Make sure that you are using the correct tokens or placeholders in your notification messages to dynamically generate the correct URLs. If the issue is due to a bug or a misconfiguration, reviewing your Prefect server and deployment settings might be necessary. If you need more specific guidance, feel free to provide additional details about your setup!
s
@Marvin - let's go again - i'm using prefect 2.20, and my automatic slack notifications work properly with an incoming webhook token - all configuered ok, except for the links that are to localhost:4200. these make the links not-usable. which environ-variables, or any other configuration, we can play with -in order to get the real host domain that will actually work?
m
To configure your Prefect 2.20 server to use a real host domain instead of
localhost
, you can adjust the following environment variables: 1.
PREFECT_API_URL
: This environment variable should be set to the external URL of your Prefect server's API. For example, if your server is accessible at
<https://your-domain.com>
, you would set:
Copy code
bash
   export PREFECT_API_URL="<https://your-domain.com/api>"
2.
PREFECT_UI_API_URL
: This is specifically for the UI. It should be set to the external proxy URL. If not set, it defaults to
PREFECT_API_URL
. To ensure proper configuration, set it explicitly:
Copy code
bash
   export PREFECT_UI_API_URL="<https://your-domain.com/api>"
These configurations will ensure that links generated in notifications point to the correct domain rather than
localhost
. For more detailed guidance on configuring your Prefect server, you can refer to the Prefect hosting guide and the settings guide. Adjust your server configuration and restart your server to apply these changes.
s
hey @Marvin - we're trying to deploy to prod prefect-server for the first time, and the deploy-command took forever (and crashed after 25 minutes) while running the
work_pool = await client.read_work_pool(deploy_config["work_pool"])
line inside prefect's internal code, from our CI. The pool name from the yaml are configured on server. Can you think why this would happen? i think this might be the first code-line to interact with the client
m
The issue you're experiencing with the
read_work_pool
line taking a long time and eventually crashing might be related to a few potential causes: 1. Work Pool Configuration: Ensure that your work pool is properly configured and that there are active workers connected. A work pool needs at least one online worker sending heartbeats to be considered ready to execute work. You can manage work pools using the Prefect CLI. 2. Network or API Issues: There might be network issues or API endpoint misconfigurations causing the delay. Verify that your Prefect server is accessible and that the API URL is correctly set. You can set the
PREFECT_API_URL
environment variable to ensure the client connects to the right endpoint. 3. Timeouts and Retries: Consider configuring timeouts and retries for your client connections if network instability is an issue. This can help handle transient failures more gracefully. 4. Version Mismatch or Bugs: Ensure that your Prefect server and client versions are compatible. Check the Prefect GitHub issues for any known bugs related to
read_work_pool
or similar functions that might affect version 2.20. For more details on managing work pools, you can refer to the work pools documentation. If the problem persists, you might want to look into logs for more specific error messages or consider reaching out to the Prefect community for support.
s
in prefect 2.20, the cron-format in the yaml is min/hour/day/month/weekday? so
0 0 * * 3
means midnight wednesday once a week? @Marvin
m
Yes, in Prefect 2.20, the cron format in the YAML configuration follows the standard cron syntax, which is
min/hour/day/month/weekday
. Therefore, the expression
0 0 * * 3
indeed schedules a task to run at midnight (00:00) every Wednesday. For more details on scheduling in Prefect, you can refer to the Prefect 2.x scheduling documentation. If you have any more questions about scheduling or anything else, feel free to ask.
s
@Marvin what's the
day_or
meaning in the yaml?
m
In Prefect, the
day_or
parameter in the YAML schedule configuration specifies how the day of the month and the day of the week entries in a cron expression are interpreted. - If
day_or
is set to
True
, the schedule will run if either the day of the month or the day of the week condition is satisfied. This means the conditions act as an OR operation. - If
day_or
is set to
False
, both the day of the month and the day of the week conditions must be satisfied for the schedule to run, acting as an AND operation. This behavior allows for more flexible scheduling options, depending on whether you want the task to run on specific days of the week, specific days of the month, or both. For more details, you can refer to the Prefect 2.x documentation on scheduling.
s
@Marvin in prefect 2.20, is there a way to download log-files of a run that's already complete? or at least search in logs instead of UI scrolling?
m
In Prefect 2.20, while you can view logs for completed runs through the Prefect UI, there isn't a direct feature to download log files or search logs programmatically from the UI itself. However, you can customize logging configurations and capture logs using standard Python logging, which might be redirected to a file or another logging service. This way, you can access and search logs outside of the Prefect UI. For more information on logging configurations, you can refer to the Prefect logging guide. If you need to search logs programmatically, consider setting up a logging configuration that writes logs to a file or external logging service where you can perform searches more efficiently.
s
@Marvin i can't search in logs either, right? only scroll
m
Correct, in the Prefect UI, you are limited to scrolling through logs for searching. There isn't a built-in feature for searching logs directly within the UI. To perform searches on logs, you would need to set up a logging configuration that exports logs to a file or an external logging service where you can perform more advanced search operations.
s
@Marvin can i change the loading window-size, to get more in each chunk? the sliding-window scrolling takes forever
m
Currently, there doesn't appear to be a built-in feature or a documented method to adjust the log loading window size in Prefect's UI to load more logs in each chunk. This limitation can indeed make scrolling through logs a bit cumbersome. If this is a significant pain point, you might consider exporting logs to an external logging service or file where you can configure the viewing settings to better suit your needs. Alternatively, you could check for any updates or feature requests related to this on the Prefect GitHub repository for potential workarounds or future enhancements.
s
@Marvin can i get the tags of a current flow - programmatically by code, while running inside the flow?
m
To programmatically access the tags of a flow while it's running, you can use the Prefect API to retrieve the flow's metadata. However, the specific method to do this isn't directly documented in the excerpts I found. Generally, you would use the Prefect context or runtime environment to access metadata related to the flow. In Prefect 2.x, you might consider using the
prefect.context
to access certain runtime information, though tags specifically might not be directly accessible this way. If you are looking for a more concrete solution, you can check the Prefect documentation on flows for any updates or detailed methods to access flow metadata programmatically. If you have further questions or need more specific guidance, feel free to ask!
s
@Marvin it seems i can get the current run's
deployment_id
- from which i can probably get the deployment tags (from the yaml), right? can i get a snippet to extract the deployment tags? also, since i'm sometimes running locally - can i do it safe not to crash in case i don't have any deployment and just running stuff in local python no server?
m
You can access the
deployment_id
from the
prefect.runtime.flow_run
module. If you're running the flow as part of a deployment, you can use this
deployment_id
to fetch deployment details and tags. Here's a snippet to safely extract deployment tags, even if running locally without a deployment:
Copy code
python
from prefect.runtime.flow_run import id as flow_run_id
from prefect.client import get_client

async def get_deployment_tags():
    try:
        # Only proceed if there's a flow run ID, indicating a deployment
        if not flow_run_id:
            print("Running locally, no deployment ID available.")
            return None

        async with get_client() as client:
            # Fetch the flow run details
            flow_run = await client.read_flow_run(flow_run_id)
            deployment_id = flow_run.deployment_id

            # Fetch deployment details using the deployment ID
            if deployment_id:
                deployment = await client.read_deployment(deployment_id)
                return deployment.tags

    except Exception as e:
        print(f"An error occurred: {e}")
        return None

# Example usage
import asyncio
tags = asyncio.run(get_deployment_tags())
print(f"Deployment tags: {tags}")
This code checks if there's a
flow_run_id
, which indicates a deployment context. If not, it assumes you're running locally and exits gracefully. If there is a deployment context, it fetches the deployment details and retrieves the tags.
s
@Marvin do i need the
flow
object at all? or just use context to get the live stats?
m
You don't need to create a
flow
object to access runtime information when you are trying to get live stats or metadata about the current flow run. Using the
prefect.runtime
context is sufficient for accessing this kind of information. The
prefect.runtime.flow_run
module provides direct access to various attributes of the flow run, such as its ID, tags, and other metadata. In the example I provided, I used
prefect.runtime.flow_run.id
to check if the code is running within a deployment and to retrieve the deployment ID. This approach is efficient and aligns with accessing runtime metadata directly. If you have any more questions or need further clarification, feel free to ask!
s
Hey @Marvin - i configured slack-notifications upon states of running/crashed/completed of scheduled-jobs, and i get (almost always) notifications of crashing right after starting the run (sometimes even before!) although there was no crash, just some time waiting for a pod to start - then it completes some time after. Can i play with something to get it to wait a bit more before publishing fake-crashes? it's just waiting for resources and it gets by a minute after..
m
It seems you're experiencing premature notifications about crashes due to resource waiting times. While I couldn't find a direct setting to delay notifications, you might consider adjusting how state changes trigger these notifications. Here are a few suggestions: 1. State Handlers: You can use custom state handlers to better control when notifications are sent. By defining logic to delay or conditionally trigger notifications, you can avoid premature alerts. 2. Backoff Strategy: Implement a backoff strategy in your state change logic to delay the transition to a "crashed" state. This can be done by checking if the resources are indeed unavailable for a certain period before declaring a crash. 3. Custom Automation: Create a custom automation script that evaluates the state over a period before sending a notification. This can be done using Prefect's Automations API, which allows for more complex logic when dealing with state changes. For more details, you can explore the Automations API documentation. Additionally, reviewing the state management code might offer insights into implementing a delay mechanism. If these solutions don't align with your needs, consider raising an issue or feature request on Prefect's GitHub repository to discuss potential enhancements with the community.
s
@Marvin i'm using prefect 2.20, not 3
m
In Prefect 2.x, handling premature notifications due to state changes can be tricky. Here's what you can consider: 1. Custom State Handlers: You can create custom state handlers for your flows or tasks to manage when notifications are sent. This allows you to add logic to delay notifications until you're certain a crash has occurred rather than a temporary resource wait. 2. Slack Notifier Configuration: When using the Slack notifier, you can specify which states should trigger notifications. If you haven't already, ensure that you're only notifying on states that truly indicate a problem. Here's a basic example of how you might configure a state handler to ignore certain states:
Copy code
python
    from prefect import task, Flow
    from prefect.utilities.notifications import slack_notifier
    from prefect.engine.state import Failed, Pending, Running

    handler = slack_notifier(only_states=[Failed])

    @task
    def my_task():
        pass

    with Flow("my_flow", state_handlers=[handler]) as flow:
        my_task()
3. Review Notifications Documentation: The Prefect 2.x documentation on notifications explains how to set up and customize notifications. It might offer additional insights into managing state-based notifications. 4. Community Discussions: If these solutions don't resolve your issue, consider engaging with the Prefect community or submitting a feature request on GitHub. There might be others with similar challenges who have found workarounds. These steps should help you better control when notifications are sent, reducing false alarms.
s
@Marvin can you re-run a finished
task
through UI, in retro? if it's completed i don't see a button. is that true also for failed ones? (prefect 2.20)
m
In Prefect 2.20, there is no direct option to re-run a completed or failed task through the UI. The UI does not provide a button or feature to re-run individual tasks retroactively once they have finished, regardless of their state (completed or failed). To re-run a task, you would typically need to re-run the entire flow or manually trigger the task using code. This limitation is consistent with the general design of Prefect's task execution model, where tasks are components of flows and are managed within the context of flow runs. For more detailed guidance or workarounds, you might want to refer to the Prefect documentation or consider reaching out to the Prefect community for support.
s
are you here @Marvin?
m
thought for 4.6 seconds
Yep—I'm here and ready to help! What are you working on with Prefect today? If you’ve got an error, code snippet, or goal you’re aiming for, drop it in and I’ll jump in.
s
@Marvin i'm using prefect 2.20 (not 3!) and i have a heavy job that took 3 hours with many sub-flows. some of them crashed (not sure why.. backoff limit reached. but let's not talk about this) - and i triggered a retry for these 2 specific sub-flows. and they worked! so now i have a parent-job that is still in Running state, and 50 sub-flows - all complete - but the parent is sort of waiting and is not aware of the sub-flows being completed. is there a way to trigger its status to check and continue running, wrapping up the entire job? the pod is up and costing me money
m
thought for 743.5 seconds
s
ok @Marvin forget about it - the parent job detected and completed the job! all good
m
thought for 10.2 seconds
Awesome—glad it worked out! If you want to add notifications or a quick check to catch this sooner next time, I’m happy to help. Otherwise, you’re all set.
Short answer: there isn’t a safe way to “poke” a running parent flow in Prefect 2.x to rebind to subflows you re-ran manually. The parent is only waiting on the specific child flow run IDs it launched. If you retriggered children as new runs, the parent won’t ever see those complete. Immediate actions to stop spend - Cancel the parent flow run now (this will tear down the pod):
Copy code
prefect flow-run cancel <PARENT_FLOW_RUN_ID>
You can also click “Cancel” in the UI on the parent run. If the pod doesn’t terminate promptly after cancelling, manually delete the Kubernetes job/pod. Why this happened - If your parent launched child runs via
run_deployment(...)
or
create_flow_run
and then waited on them, it’s waiting on those specific run IDs. - Manually re-running a child from the UI often creates a new run; the parent won’t notice that new run finishing. - One more gotcha:
wait_for_flow_run
has a default 3h timeout. If your parent used it without overriding
timeout
, it may have stopped waiting after 3 hours. What to do next - If you only need the parent marked done (and don’t need wrap-up code to execute), cancelling is the cleanest way to stop the pod. Starting a fresh “wrap-up” flow that checks downstream results (and does any finalization) is safer than trying to force the parent to continue. - If you do need the parent’s wrap-up code to run, re-run the parent with logic that skips work for children that are already Completed. Patterns to use going forward 1) Call subflows as functions (best for tight orchestration)
Copy code
from prefect import flow

@flow
def child(item_id: str):
    ...

@flow
def parent(ids: list[str]):
    for i in ids:
        child.with_options(name=f"child-{i}")(i)  # waits for each child
2) If you use deployments, explicitly wait for the exact child runs you started and remove the timeout
Copy code
from prefect import flow
from prefect.deployments import run_deployment
from prefect.flow_runs import wait_for_flow_run

@flow
def parent(ids: list[str]):
    runs = []
    for i in ids:
        fr = run_deployment(
            name="child-flow/deployment",
            parameters={"id": i},
            as_subflow=True,   # ties the run to this parent
            timeout=None       # do not stop waiting early
        )
        runs.append(fr)

    for fr in runs:
        wait_for_flow_run(fr.id, timeout=None, log_states=True)
3) Prefer configuring retries on the child flow (so the same run ID retries) instead of manually re-running
Copy code
@flow(retries=3, retry_delay_seconds=60)
def child(...):
    ...
A couple questions so I can give you targeted fixes: - How did the parent launch the subflows: direct
@flow
calls or
run_deployment
? - How did you retrigger the children: UI “Retry” on the failed run, or “Run”/“Rerun” on the deployment (creating new runs)? - Are you using a Kubernetes worker/pool for the parent pod? If you share those details or a minimal snippet of the parent orchestration, I can suggest the smallest change to make this recoverable and prevent it in the future.
s
Hey @Marvin - is there a way in prefect 2.20 (UI) to shut off all schedules? i see toggle per-deployment but i have hundreds of them to shut off (in dev-env that cost money and not needed)
m
thought for 259.0 seconds
Short answer: there isn’t a global “disable all schedules” toggle in the Prefect 2.20 UI. What you can do instead (choose based on your goal): If you just want to stop compute (but don’t mind runs still being scheduled) - Pause your work pools (fewer things to click than hundreds of deployments) - UI: Pause each work pool in the Work Pools page - CLI:
Copy code
prefect work-pool pause "<pool-name>"
- You can also stop/scale down your workers. This prevents any new runs from executing. - Docs: Work pools If you want to stop new scheduled runs from being created - Self-hosted Prefect Server only: disable the scheduler service globally and restart the API - Set env var on your Prefect API service:
Copy code
PREFECT_API_SERVICES_SCHEDULER_ENABLED=false
- This stops all scheduled flow runs from being created until you re-enable it. - Prefect Cloud: there’s no workspace-level “disable scheduler” switch. You’ll need to pause/clear schedules per deployment: - List schedules for a deployment:
Copy code
prefect deployment schedule ls "<deployment-name>"
- Pause a specific schedule:
Copy code
prefect deployment schedule pause "<deployment-name>" <schedule-id>
- Remove all schedules for a deployment (you can re-create them by re-deploying later):
Copy code
prefect deployment schedule clear "<deployment-name>"
- Schedules docs: Schedules in Prefect 2 Notes on bulk operations - There isn’t a one-click bulk pause/clear for all deployments in 2.20. - Practical approaches: - If you have a small number of work pools, pausing pools is the fastest way to stop spend immediately. - If you need to truly stop new runs from being created, you can script across deployments with the CLI (e.g., iterate over deployment names and call
prefect deployment schedule clear "<deployment>"
). I’m happy to provide a small script if you can share how your deployments are named or whether you’re on Mac/Linux/Windows. Quick clarifications to tailor the steps: 1) Are you on Prefect Cloud or self-hosted Prefect Server? 2) Do you want to stop runs from being created at all, or just prevent execution to save compute?
s
@Marvin i'm a bit confused - i have many flows, but there are two toggles (in UI): one on top (maybe that's the active true/flase?) and one under the "Schedules" title in the UI (second row) - maybe this is the
schedules
as the UI also mentions the specific times. i assume both need to be on in order to run. can i easily query all deployments that have BOTH on, to see which are problematically set to run?
m
thought for 130.4 seconds
You’re reading the UI right: - The top toggle is the deployment-level state (Active vs Paused). If a deployment is paused, no schedules for it will create runs. - The toggles under the Schedules section are per-schedule. Each schedule must be enabled to emit runs. - For scheduled runs to be created, the deployment must be Active AND at least one schedule under it must be enabled. There isn’t a single UI filter for “both on” in 2.20, but we can query this programmatically. Two options: Option A: Python (uses the Prefect 2 client to find deployments where paused=False and at least one schedule is active) - Works on both Prefect Cloud and self-hosted, respects your PREFECT_API_URL/PREFECT_API_KEY. If you want, I can drop in a ready-to-run snippet; just let me know: - Are you on Prefect Cloud or self-hosted? - Do you prefer a Python script or a quick-and-dirty shell loop? Option B: CLI + loop (quick check without coding) - Iterate all deployments, skip those paused, and check if any schedule is active. - This is a bit brittle due to table parsing, but I can provide a bash one-liner if that’s easier for your environment. References - Schedules in Prefect 2 - CLI schedule management: - List schedules for a deployment:
Copy code
prefect deployment schedule ls "<deployment-name>"
- Pause a specific schedule:
Copy code
prefect deployment schedule pause "<deployment-name>" <schedule-id>
- Clear all schedules for a deployment:
Copy code
prefect deployment schedule clear "<deployment-name>"
s
@Marvin let's do a python snippet.
m
thought for 73.6 seconds
Great — here’s a ready-to-run Python script that lists all deployments where: - the deployment itself is Active (not paused), and - it has at least one enabled schedule. It works with Prefect 2.20 (Cloud or self-hosted). It uses your configured PREFECT_API_URL and PREFECT_API_KEY.
Copy code
import asyncio
from typing import List, Dict

try:
    # Prefect 2.10+ exposes get_client here
    from prefect.client import get_client
except ImportError:
    # Older 2.x fallback
    from prefect.client.orion import get_client  # type: ignore


async def find_active_deployments_with_enabled_schedules() -> List[Dict]:
    rows = []
    async with get_client() as client:
        offset = 0
        limit = 200

        while True:
            deployments = await client.read_deployments(offset=offset, limit=limit)
            if not deployments:
                break

            for d in deployments:
                # Deployment must be active
                if getattr(d, "paused", False):
                    continue

                # Get full deployment so we can inspect schedules
                full = await client.read_deployment(d.id)

                schedules = getattr(full, "schedules", []) or []
                active_schedules = [s for s in schedules if getattr(s, "active", False)]
                if not active_schedules:
                    continue

                flow_name = getattr(full, "flow_name", None)
                name = f"{flow_name}/{full.name}" if flow_name else full.name
                rows.append(
                    {
                        "name": name,
                        "deployment_id": str(full.id),
                        "work_pool": getattr(full, "work_pool_name", None)
                                      or getattr(full, "work_queue_name", None),
                        "active_schedule_count": len(active_schedules),
                        # Optional: include schedule IDs/times
                        "schedule_ids": [str(s.id) for s in active_schedules],
                    }
                )

            offset += limit

    return rows


if __name__ == "__main__":
    results = asyncio.run(find_active_deployments_with_enabled_schedules())
    print(f"Deployments active with enabled schedules: {len(results)}")
    for r in results:
        print(
            f"- {r['name']} "
            f"(id={r['deployment_id']}, pool={r['work_pool']}, "
            f"active_schedules={r['active_schedule_count']}, "
            f"schedule_ids={r['schedule_ids']})"
        )
If you’d like, I can tweak this to: - dump to CSV/JSON, - include schedule expressions/next times, - or directly pause/clear schedules for everything it finds. Reference: Schedules in Prefect 2
s
this worked @Marvin, thanks! A new question: i have a case where my prefect-server is running some kind of scheduler-db-failure with this log being repeated:
Copy code
15:28:54.822 | WARNING | prefect.server.services.recentdeploymentsscheduler - RecentDeploymentsScheduler took 20.682503 seconds to run, which is longer than its loop interval of 5 seconds.
    result = await greenlet_spawn(
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 203, in greenlet_spawn
    result = context.switch(value)
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/orm/session.py", line 2362, in execute
    return self._execute_internal(
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/orm/session.py", line 2247, in _execute_internal
    result: Result[Any] = compile_state_cls.orm_execute_statement(
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/orm/context.py", line 293, in orm_execute_statement
    result = conn.execute(
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1418, in execute
    return meth(
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/sql/elements.py", line 515, in _execute_on_connection
    return connection._execute_clauseelement(
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1640, in _execute_clauseelement
    ret = self._execute_context(
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1846, in _execute_context
    return self._exec_single_context(
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1986, in _exec_single_context
    self._handle_dbapi_exception(
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 2358, in _handle_dbapi_exception
    raise exc_info[1].with_traceback(exc_info[2])
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1967, in _exec_single_context
    self.dialect.do_execute(
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/engine/default.py", line 941, in do_execute
    cursor.execute(statement, parameters)
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 572, in execute
    self._adapt_connection.await_(
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 132, in await_only
    return current.parent.switch(awaitable)  # type: ignore[no-any-return,attr-defined] # noqa: E501
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 196, in greenlet_spawn
    value = await result
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 550, in _prepare_and_execute
    self._handle_exception(error)
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 501, in _handle_exception
    self._adapt_connection._handle_exception(error)
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 786, in _handle_exception
    raise error
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 538, in _prepare_and_execute
    self._rows = deque(await prepared_stmt.fetch(*parameters))
  File "/usr/local/lib/python3.9/site-packages/asyncpg/prepared_stmt.py", line 176, in fetch
    data = await self.__bind_execute(args, 0, timeout)
  File "/usr/local/lib/python3.9/site-packages/asyncpg/prepared_stmt.py", line 241, in __bind_execute
    data, status, _ = await self.__do_execute(
  File "/usr/local/lib/python3.9/site-packages/asyncpg/prepared_stmt.py", line 230, in __do_execute
    return await executor(protocol)
  File "asyncpg/protocol/protocol.pyx", line 207, in bind_execute
asyncio.exceptions.TimeoutError
15:28:54.822 | WARNING | prefect.server.services.recentdeploymentsscheduler - RecentDeploymentsScheduler took 20.682503 seconds to run, which is longer than its loop interval of 5 seconds.
this also joins an issue where we saw a bump in our RDS prefect-DB costs bumping. do you see what's the issue?
m
thought for 267.2 seconds
Short answer: your server’s scheduler is falling behind because its queries to Postgres are timing out. That warning plus the asyncpg TimeoutError means the RecentDeploymentsScheduler is spending 20s+ inside DB calls each cycle (its loop defaults to 5s), which matches your RDS cost spike — the DB is under sustained load doing scheduling work. Likely culprits - High‑frequency schedules (especially RRule patterns like MINUTELY or BYSECOND) that expand a lot of run times each loop; these are known to be costly - See: Scheduler CPU spikes and “RecentDeploymentsScheduler took … longer than loop interval” with RRule schedules - A large scheduling horizon and/or many deployments, causing large batches of run generations per loop - Database contention or insufficient resources (small instance, connection pool mismatch, table bloat/vacuuming) - Multiple Prefect API/services processes each maintaining their own DB pools, multiplying your total connections Quick mitigations to try - If you have any second/minute‑level RRule schedules, switch to Cron/Interval where possible or reduce frequency. These are the most common source of heavy scheduling work. - Reduce the scheduler’s per‑loop work and scheduling horizon: - Increase the loop interval for “recent deployments” - Lower batch sizes and max runs - Narrow the scheduling window (bigger min_scheduled_time and/or smaller max_scheduled_time) Example env vars you can set for the server services container
Copy code
# Slow the loops, reduce work per cycle
PREFECT_SERVER_SERVICES_SCHEDULER_RECENT_DEPLOYMENTS_LOOP_SECONDS=30
PREFECT_SERVER_SERVICES_SCHEDULER_LOOP_SECONDS=60
PREFECT_SERVER_SERVICES_SCHEDULER_MAX_RUNS=50
PREFECT_SERVER_SERVICES_SCHEDULER_DEPLOYMENT_BATCH_SIZE=50
PREFECT_SERVER_SERVICES_SCHEDULER_INSERT_BATCH_SIZE=250

# Narrow the scheduling horizon (defaults: min=PT1H, max=P100D)
PREFECT_SERVER_SERVICES_SCHEDULER_MIN_SCHEDULED_TIME=PT2H
PREFECT_SERVER_SERVICES_SCHEDULER_MAX_SCHEDULED_TIME=P7D
DB tuning knobs (per server/services process) - Pool sizing: larger pools reduce connection wait, but raise total RDS connections (and cost). Tune carefully.
Copy code
PREFECT_SERVER_DATABASE_POOL_SIZE=10
PREFECT_SERVER_DATABASE_MAX_OVERFLOW=10
PREFECT_SERVER_DATABASE_POOL_TIMEOUT=30
PREFECT_SERVER_DATABASE_POOL_RECYCLE=3600
PREFECT_SERVER_DATABASE_CONNECT_ARGS__APPLICATION_NAME=prefect-server
# Optional: reduce prepared/statement cache sizes if memory pressure is high
PREFECT_SERVER_DATABASE_CONNECT_ARGS__PREPARED_STATEMENT_CACHE_SIZE=100
PREFECT_SERVER_DATABASE_CONNECT_ARGS__STATEMENT_CACHE_SIZE=100
Settings reference: Prefect 3 settings reference Database side checks - Enable slow query logging (e.g., log_min_duration_statement) or use RDS Performance Insights to confirm which statements are hot; look for scheduling queries hitting deployments and schedules tables. - Ensure autovacuum is effective; vacuum/analyze if there’s bloat. - If needed, increase DB instance size/IO temporarily to clear backlog. - Consider setting a Postgres statement_timeout at the DB/parameter-group level to fail fast on pathological queries. Upgrade note - If you’re not on the latest 3.x, upgrading is recommended — we’ve shipped numerous scheduler and DB performance improvements over time. Release notes: Prefect releases A few clarifying questions to pinpoint this faster - Which Prefect version are you on? - Roughly how many deployments and which schedule types are you using? Any RRule schedules with MINUTELY/BYSECOND? - How many Prefect API/services processes are running (each one has its own connection pool)? - Any recent growth in flow_runs or events volume around when the RDS cost spiked? If you can share your scheduler-related env settings and a sample of the schedule definitions that run most frequently, I can suggest a tighter set of values tailored to your load.
s
@Marvin you're talking about prefect 2.20, right? just making sure i connected to the server (through OpenLens) to see some variables and they don't exist. does this make sense? what are the defaults?
m
thought for 76.6 seconds
Thanks for confirming — got it, you’re on Prefect 2.20. Why you didn’t see those env vars - That makes sense: in 2.x most scheduler/database settings are optional. If you don’t set the environment variables, Prefect uses built‑in defaults and nothing will show up in the pod’s env. - Also, 2.x and 3.x use different env var prefixes. In 2.x they start with `PREFECT_API_...`; in 3.x they start with
PREFECT_SERVER_...
. If you were looking for
PREFECT_SERVER_*
keys in a 2.x server, they won’t exist. Yes, the log name is expected - The log component
prefect.server.services.recentdeploymentsscheduler
exists in 2.x as well. The warning “RecentDeploymentsScheduler took X seconds…” means that pass of scheduling took longer than its loop interval (the “recent deployments” loop defaults to 5 seconds in 2.x). Prefect 2.20 scheduler defaults and env var names - These are the common 2.x knobs and their defaults. If you don’t set the env vars, Prefect uses these defaults: Scheduler service (2.x) -
PREFECT_API_SERVICES_SCHEDULER_ENABLED
= true -
PREFECT_API_SERVICES_SCHEDULER_LOOP_SECONDS
= 60 -
PREFECT_API_SERVICES_SCHEDULER_RECENT_DEPLOYMENTS_LOOP_SECONDS
= 5 -
PREFECT_API_SERVICES_SCHEDULER_DEPLOYMENT_BATCH_SIZE
= 100 -
PREFECT_API_SERVICES_SCHEDULER_INSERT_BATCH_SIZE
= 500 -
PREFECT_API_SERVICES_SCHEDULER_MAX_RUNS
= 100 -
PREFECT_API_SERVICES_SCHEDULER_MIN_RUNS
= 3 -
PREFECT_API_SERVICES_SCHEDULER_MIN_SCHEDULED_TIME
= PT1H (1 hour) -
PREFECT_API_SERVICES_SCHEDULER_MAX_SCHEDULED_TIME
= P100D (100 days) Database pool (2.x) -
PREFECT_API_DATABASE_POOL_SIZE
= 5 -
PREFECT_API_DATABASE_MAX_OVERFLOW
= 10 -
PREFECT_API_DATABASE_POOL_TIMEOUT
= 30 -
PREFECT_API_DATABASE_POOL_RECYCLE
= 3600 -
PREFECT_API_DATABASE_CONNECTION_URL
= your DSN (this is the one you typically have set) How to view the effective config (with defaults) - Exec into the API/server pod and run:
Copy code
prefect config view --show-defaults | grep -i -e api.services.scheduler -e api.database -e SCHEDULER -e DATABASE
- You can also narrow in on scheduler:
Copy code
prefect config view --show-defaults | sed -n '/api.services.scheduler/,+25p'
Why the warning + RDS cost spike - In 2.20 this usually means the scheduler’s DB queries are slow/expensive — often due to frequent schedules (e.g., RRule MINUTELY/BYSECOND), a wide scheduling horizon, many deployments, or DB resource/connection pressure. Each pass tries to generate upcoming runs; if that work takes > loop interval, it falls behind and keeps hammering the DB. If you want, share: - How many deployments and which schedule types you have (especially any RRule schedules) - Your DB instance size and observed connections - Whether you’re running multiple API/services pods (each has its own pool) I can suggest concrete 2.x tuning values to reduce load. Also, if/when you’re ready, upgrading to 3.x includes additional scheduler and DB performance improvements; I’m happy to outline a path when it’s helpful. Refs - 2.x settings guide: Prefect 2 settings - 2.x schedules overview: Schedules in Prefect 2
s
@Marvin this is the current config:
Copy code
PREFECT_API_DATABASE_CONNECTION_TIMEOUT='30.0' (from env)
PREFECT_API_DATABASE_CONNECTION_URL='********' (from env)
PREFECT_API_DATABASE_ECHO='False' (from defaults)
PREFECT_API_DATABASE_MIGRATE_ON_START='True' (from defaults)
PREFECT_API_DATABASE_PASSWORD='********' (from defaults)
PREFECT_API_DATABASE_TIMEOUT='10.0' (from defaults)
PREFECT_API_SERVICES_SCHEDULER_DEPLOYMENT_BATCH_SIZE='100' (from defaults)
PREFECT_API_SERVICES_SCHEDULER_ENABLED='True' (from defaults)
PREFECT_API_SERVICES_SCHEDULER_INSERT_BATCH_SIZE='500' (from defaults)
PREFECT_API_SERVICES_SCHEDULER_LOOP_SECONDS='60.0' (from defaults)
PREFECT_API_SERVICES_SCHEDULER_MAX_RUNS='100' (from defaults)
PREFECT_API_SERVICES_SCHEDULER_MAX_SCHEDULED_TIME='100 days, 0:00:00' (from defaults)
PREFECT_API_SERVICES_SCHEDULER_MIN_RUNS='3' (from defaults)
PREFECT_API_SERVICES_SCHEDULER_MIN_SCHEDULED_TIME='1:00:00' (from defaults)
i'm still not sure what the root-cause is: if we're not running anything crazy on server, why would 20 seconds won't be enough for a (simple) query? what should i do to make it aligned?
m
thought for 47.2 seconds
Great, thanks for sharing the effective config. You’re on Prefect 2.20 and your values look like the 2.x defaults, which helps. Why this can still take >20s - The scheduler isn’t just “a simple query.” Each loop: - Reads deployments/schedules to compute upcoming runs across a time horizon - Inserts new flow runs (batched) while avoiding duplicates - Repeats for “recently updated deployments” in a fast loop (defaults to 5s) - If tables are large, poorly indexed, bloated, or locked, these steps can stall. Even without high run volume, a wide scheduling horizon + many deployments/schedules or minute/second-level schedules can cause heavy work. - Your
PREFECT_API_DATABASE_TIMEOUT=10.0
means any one DB operation exceeding 10s will time out at the client. That matches the asyncpg TimeoutError you’re seeing. The 20s loop time includes retries/await plus the rest of the loop. Most actionable next steps Start with visibility so you fix the real cause rather than just raising timeouts. 1) Identify the slow/blocked queries - Temporarily enable slow query logging on RDS (e.g., set log_min_duration_statement=500ms) or use Performance Insights to see the top statements during scheduler spikes. - Check for locks/contention: - Query pg_stat_activity for waiting queries:
Copy code
SELECT pid, wait_event_type, wait_event, state, query
    FROM pg_stat_activity
    WHERE datname = current_database()
    ORDER BY state, wait_event_type;
- If you can enable pg_stat_statements, look for the top queries by total time. 2) Reduce scheduler work per loop (2.x env vars) - These are 2.x names (prefix PREFECT_API_…). Try these first:
Copy code
# Slow the “recent deployments” loop (default is 5s)
  PREFECT_API_SERVICES_SCHEDULER_RECENT_DEPLOYMENTS_LOOP_SECONDS=30

  # Reduce per-loop work and batching
  PREFECT_API_SERVICES_SCHEDULER_MAX_RUNS=50
  PREFECT_API_SERVICES_SCHEDULER_DEPLOYMENT_BATCH_SIZE=50
  PREFECT_API_SERVICES_SCHEDULER_INSERT_BATCH_SIZE=250

  # Narrow the scheduling horizon (defaults: min=PT1H, max=P100D)
  PREFECT_API_SERVICES_SCHEDULER_MIN_SCHEDULED_TIME=PT2H
  PREFECT_API_SERVICES_SCHEDULER_MAX_SCHEDULED_TIME=P7D
- These changes reduce how much the scheduler must compute/insert each pass. 3) Loosen the client timeout while you tune - Your client timeout is currently 10s:
Copy code
PREFECT_API_DATABASE_TIMEOUT=30
- Raising to 30–60 can stop spurious timeouts while you address the root cause. Don’t leave it too high long-term if queries are truly slow. 4) Right-size DB pooling and check total connections - Defaults are modest but multiply by the number of API/services pods. If you have several pods, connection counts can spike.
Copy code
PREFECT_API_DATABASE_POOL_SIZE=5        # or 10 if your DB can handle it
  PREFECT_API_DATABASE_MAX_OVERFLOW=10
  PREFECT_API_DATABASE_POOL_TIMEOUT=30
  PREFECT_API_DATABASE_POOL_RECYCLE=3600
- Keep an eye on RDS max connections and CPU/IO. If the DB is saturated, the scheduler will suffer. 5) Check schedules and data volume - Do you have any minute/second-level RRule schedules (e.g., MINUTELY, BYSECOND)? Those are known to be heavy. Prefer Cron/Interval schedules or reduce frequency. - If you have millions of old flow_runs/flow_run_states/logs, scheduling queries can degrade. Ensure autovacuum/analyze are effective; consider manual VACUUM (FULL only with downtime) and REINDEX if bloat is high. 6) Optional: surface queries from Prefect - For a short diagnostic window, you can set:
Copy code
PREFECT_API_DATABASE_ECHO=true
This logs SQL from the API process. It’s very verbose; use briefly in non-peak periods. What I’d do in your shoes - Immediately bump
PREFECT_API_DATABASE_TIMEOUT
to 30, and set: ``` PREFECT_API_SERVICES_SCHEDULER_RECENT_DEPLOYMENTS_LOOP_SECONDS=30 PREFECT_API_SERVICES_SCHEDULER_MAX_RUNS=50 PREFECT_API_SERVICES_SCHEDULER_DEPLOYMENT_BATCH_SIZE=50 PREFECT_API_SERVICES_SCHEDULER_INSERT_BATCH_SIZE=250
PREFECT_API_SERVICES_SCHEDULER_MIN_SCHEDULED_TIME=PT2H PREFECT_API_SERVICES_SCHEDULER_MAX_SCHEDULED_TIME=P7D ``` - In parallel, enable RDS slow query logging or use Performance Insights to identify the exact hot statements and whether you’re hitting locks or table scans. - If you do have minute/second-level RRules, rewrite or reduce their frequency. References - 2.x settings: Prefect 2 settings - Known report on scheduler heaviness with RRule: Scheduler CPU spikes with RRule schedules If you can share roughly: - Number of deployments and schedule patterns - Approx row counts in flow_runs/flow_run_states - Number of API/services pods I can suggest more tailored values and help interpret any slow query data you find.
s
@Marvin OK i'm connected to the DB. I see 9GB size tables (task_run_state, flow_run_state), the rest are smaller. what is the query that gets stuck? can you see from logs? anything i can easily clean?
m
thought for 92.4 seconds
Thanks — that size on
flow_run_state
/
task_run_state
can definitely slow the scheduler. What query is “stuck”? - From the log snippet alone, we can’t see the exact SQL — Prefect 2.x does not log SQL by default. The asyncpg TimeoutError just tells us a DB call exceeded your
PREFECT_API_DATABASE_TIMEOUT
(10s in your config). - To identify the exact statement, use Postgres visibility: - See currently running/waiting queries:
Copy code
SELECT pid,
           now() - query_start AS runtime,
           state,
           wait_event_type,
           wait_event,
           application_name,
           query
    FROM pg_stat_activity
    WHERE datname = current_database()
      AND state <> 'idle'
    ORDER BY query_start;
- If you can, enable pg_stat_statements and get the top time-consuming statements:
Copy code
-- enable once in your DB/parameter group:
    -- CREATE EXTENSION IF NOT EXISTS pg_stat_statements;

    SELECT total_exec_time, mean_exec_time, calls, query
    FROM pg_stat_statements
    ORDER BY total_exec_time DESC
    LIMIT 20;
- RDS Performance Insights (if enabled) will also show you the exact SQL and wait events. Optional: label Prefect connections so they’re easy to spot - Add an application name to your DSN so queries show up labeled: - Append
?application_name=prefect-api
to
PREFECT_API_DATABASE_CONNECTION_URL
- After rolling the pod, look for
application_name = 'prefect-api'
in
pg_stat_activity
. What the scheduler typically does - Reads deployments with active schedules - Computes upcoming run times (in Python) - Inserts new rows into
flow_runs
and corresponding states in batches - Checks for duplicates/limits If the state/run tables are very large and bloated or missing stats, even simple existence checks and inserts can slow down. Safe ways to reduce table size quickly 1) Delete old flow runs (cascades states and task runs) - In 2.x, FK constraints cascade states and task runs when you delete flow runs. If you’re comfortable purging old history, this is the fastest way to shrink
*_state
tables. - Example: delete completed flow runs older than 30 days. Adjust filters to your needs and test on a small window first.
Copy code
-- recommended: create an index if not present to help this query:
  -- CREATE INDEX IF NOT EXISTS idx_flow_runs_end_time ON flow_runs (end_time);

  DELETE FROM flow_runs
  WHERE end_time IS NOT NULL
    AND end_time < now() - interval '30 days';
This will cascade delete from
flow_run_states
,
task_runs
, and
task_run_states
via FKs. 2) Prune historical states older than N days while keeping the current state - If you want to keep runs but trim old state history:
Copy code
-- Flow run states: keep the state referenced by flow_runs.state_id, delete older ones
  DELETE FROM flow_run_states s
  USING flow_runs r
  WHERE s.flow_run_id = r.id
    AND s.id <> r.state_id
    AND s.timestamp < now() - interval '30 days';

  -- Task run states: same idea
  DELETE FROM task_run_states s
  USING task_runs r
  WHERE s.task_run_id = r.id
    AND s.id <> r.state_id
    AND s.timestamp < now() - interval '30 days';
Notes: - Ensure
timestamp
exists in your schema (it does in 2.x). - Do this in batches to avoid long locks, e.g., add
LIMIT
with a CTE or slice by time ranges. 3) Maintenance after purge - Update stats and reclaim space:
Copy code
VACUUM (ANALYZE) flow_runs;
  VACUUM (ANALYZE) flow_run_states;
  VACUUM (ANALYZE) task_runs;
  VACUUM (ANALYZE) task_run_states;
For significant bloat and if you can afford locks/downtime:
VACUUM FULL
on the biggest tables or
REINDEX TABLE ...
. Tune the scheduler/db to stop the bleeding - In 2.x (your case), try these environment variables: ``` # Slow the “recent deployments” loop to reduce thrash PREFECT_API_SERVICES_SCHEDULER_RECENT_DEPLOYMENTS_LOOP_SECONDS=30 # Reduce work per pass PREFECT_API_SERVICES_SCHEDULER_MAX_RUNS=50
PREFECT_API_SERVICES_SCHEDULER_DEPLOYMENT_BATCH_SIZE=50 PREFECT_API_SERVICES_SCHEDULER_INSERT_BATCH_SIZE=250 # Narrow the scheduling window (defaults: min=PT1H, max=P100D) PREFECT_API_SERVICES_SCHEDULER_MIN_SCHEDULED_TIME=PT2H PREFECT_API_SERVICES_SCHEDULER_MAX_SCHEDULED_TIME=P7D # Give DB calls more breathing room while you clean up PREFECT_API_DATABASE_TIMEOUT=30 ``` - Also confirm you’re not running multiple API/services pods each with their own pool; total connections add up. Find and fix the exact slow plan - Once you capture the slow statements via
pg_stat_activity
or Performance Insights, run
EXPLAIN (ANALYZE, BUFFERS)
on them to see if you need additional indexes. Common helpful indexes: -
flow_runs(deployment_id)
-
flow_runs(end_time)
-
flow_run_states(flow_run_id, timestamp)
-
task_run_states(task_run_id, timestamp)
- If you share the slow query text, I can suggest the exact index. Can logs show the SQL directly? - Not by default. You can temporarily set
PREFECT_API_DATABASE_ECHO=true
to log SQL from the API process (very verbose — use briefly and preferably off-peak). If you’d like, post: - One or two of the longest-running queries you see in
pg_stat_activity
or Performance Insights - Approximate row counts of
flow_runs
,
task_runs
I’ll help interpret and propose precise indexes and a pruning plan.
s
@Marvin no open query. i want to delete cancelled tasks, not the completed ones. they are still not the majority though (most are completed). but i mostly not sure: why is flow-run-state table that huge? data is null, and there aren't many runs (flow-runs is only 9M, compared to 9GB) to begin with, why is it so chubby storage in the "state"? can i delete stuff?
m
thought for 54.4 seconds
Totally fair questions. Why flow_run_states/task_run_states get huge - Every run accumulates a history of states. One flow run might create several flow_run_states over its lifetime; each task run can create even more (PENDING → SCHEDULED → RUNNING → RETRYING → RUNNING → COMPLETED/FAILED/CANCELLED, etc.). Large DAGs multiply this quickly. - The “data” column being NULL is normal; Prefect 2 doesn’t persist results by default. Table size comes from: - many rows (state history) - indexes on those rows - dead tuples/bloat from frequent inserts/updates/deletes if autovacuum can’t keep up - TOAST for large JSON fields (e.g., state_details) when present - If tables have been busy for a long time, they can become “chubby” even if current live rows aren’t massive, due to bloat. You’ll only reclaim disk with VACUUM FULL/pg_repack after deletions. Quick visibility to confirm - Check live vs dead tuples and total size:
Copy code
SELECT relname, n_live_tup, n_dead_tup
FROM pg_stat_user_tables
WHERE relname IN ('flow_run_states','task_run_states');

SELECT relname, pg_size_pretty(pg_total_relation_size(relid)) AS total
FROM pg_catalog.pg_statio_user_tables
WHERE relname IN ('flow_run_states','task_run_states');

-- average states per run (sanity checks)
SELECT
  (SELECT COUNT(*) FROM flow_run_states) AS flow_state_rows,
  (SELECT COUNT(*) FROM flow_runs) AS flow_runs,
  ROUND((SELECT COUNT(*) FROM flow_run_states)::numeric / GREATEST((SELECT COUNT(*) FROM flow_runs),1), 2) AS avg_flow_states_per_run;

SELECT
  (SELECT COUNT(*) FROM task_run_states) AS task_state_rows,
  (SELECT COUNT(*) FROM task_runs) AS task_runs,
  ROUND((SELECT COUNT(*) FROM task_run_states)::numeric / GREATEST((SELECT COUNT(*) FROM task_runs),1), 2) AS avg_task_states_per_run;
If n_dead_tup is large, you’ll get immediate wins from pruning + vacuum. You want to delete cancelled tasks (not completed) Here are safe, targeted cleanup options for Prefect 2.x. Always test on a small time window first and take a backup/snapshot before large deletes. Option A: Delete cancelled task runs older than N days (cascades their states) - This removes the entire cancelled task run rows and their state history, but keeps other runs intact.
Copy code
-- Example: older than 30 days; adjust as needed
-- Helpful index if missing: CREATE INDEX IF NOT EXISTS idx_task_runs_end_time ON task_runs (end_time);

WITH doomed AS (
  SELECT tr.id
  FROM task_runs tr
  JOIN task_run_states s ON s.id = tr.state_id
  WHERE s.type IN ('CANCELLED','CANCELLING')
    AND tr.end_time IS NOT NULL
    AND tr.end_time < now() - interval '30 days'
  LIMIT 10000
)
DELETE FROM task_runs tr
WHERE tr.id IN (SELECT id FROM doomed);

-- Repeat the DELETE until it affects 0 rows to process in batches.
Option B: Keep runs but prune old state history (big win for table size) - Keep the current state each run points to; delete older historical states for cancelled task runs only (or do this for all task runs if you want maximal reduction).
Copy code
-- Prune task_run_states for CANCELLED runs, keeping the current state
DELETE FROM task_run_states s
USING task_runs r, task_run_states cur
WHERE s.task_run_id = r.id
  AND cur.id = r.state_id
  AND cur.type IN ('CANCELLED','CANCELLING')
  AND s.id <> r.state_id
  AND s.timestamp < now() - interval '30 days'
LIMIT 50000;  -- run in batches
- You can run a similar prune for flow_run_states if desired:
Copy code
DELETE FROM flow_run_states s
USING flow_runs r, flow_run_states cur
WHERE s.flow_run_id = r.id
  AND cur.id = r.state_id
  AND cur.type IN ('CANCELLED','CANCELLING')
  AND s.id <> r.state_id
  AND s.timestamp < now() - interval '30 days'
LIMIT 50000;
After cleanup, reclaim space - Update stats so planners pick the right indexes:
Copy code
VACUUM (ANALYZE) task_run_states;
VACUUM (ANALYZE) flow_run_states;
VACUUM (ANALYZE) task_runs;
VACUUM (ANALYZE) flow_runs;
- To truly shrink on-disk size, consider:
- VACUUM FULL task_run_states; VACUUM FULL flow_run_states; (requires locks/downtime) - or use pg_repack for online compaction Why “no open query” but scheduler times out? - The timeouts happen during scheduler spikes; you may have checked pg_stat_activity between spikes. Adding an application_name to your DSN helps you spot Prefect queries quickly: - Append
?application_name=prefect-api
to
PREFECT_API_DATABASE_CONNECTION_URL
- If you still see no active queries at the time of warnings, the bottleneck may be intermittent (autovacuum, IO contention) or the scheduler loop is delayed by something else. Enabling RDS Performance Insights or pg_stat_statements for a day will typically reveal the heavy statements. Two more tips that usually help - Narrow the scheduler horizon and reduce per-loop work (2.x env vars):
Copy code
PREFECT_API_SERVICES_SCHEDULER_RECENT_DEPLOYMENTS_LOOP_SECONDS=30
PREFECT_API_SERVICES_SCHEDULER_MAX_RUNS=50
PREFECT_API_SERVICES_SCHEDULER_DEPLOYMENT_BATCH_SIZE=50
PREFECT_API_SERVICES_SCHEDULER_INSERT_BATCH_SIZE=250
PREFECT_API_SERVICES_SCHEDULER_MIN_SCHEDULED_TIME=PT2H
PREFECT_API_SERVICES_SCHEDULER_MAX_SCHEDULED_TIME=P7D
PREFECT_API_DATABASE_TIMEOUT=30
- Ensure autovacuum keeps up. If n_dead_tup is high for states tables, consider increasing autovacuum aggressiveness for those tables or scheduling periodic manual VACUUM. If you can share: - Whether “9M” for flow_runs is rows or 9 MB - The avg states per run from the queries above - A sample slow statement from Performance Insights/pg_stat_statements I can recommend exact indexes and a tailored pruning cadence (and convert the deletes into a scheduled job).
s
@Marvin i want to go with option B (removing all temp-states and leave only final ones). i see that your query is a bit off (
task_run_states
table is called
task_run_state
). but i fixed it and did count instead of delete (join and not using) - about 40% of the records will be deleted (great!), it seems answering your questions: 9M means 9MB, but also around that in rows-count dead-tuple is super small (2 i think) average flow-states per run: 11307.28 average task-states per run: 7601.34
m
thought for 63.4 seconds
Got it — and great catch on the table names. Please adapt the queries below to your exact schema (plural vs singular). I’ll show them in the canonical Prefect 2.x schema and note where to change. Your averages are the smoking gun - Avg flow states per run: 11,307 - Avg task states per run: 7,601 That’s extraordinarily high. Even without table bloat, that row volume will slow scheduling checks. Pruning historical states will help a lot, and we should later look for the cause of the state “storm” so it doesn’t recur. Prune state history: keep only the current/final state for terminal runs - Keep the state that each run currently points to (
r.state_id
) when the run is terminal (COMPLETED/FAILED/CANCELLED/CRASHED). Delete all other historical states for that run. - Do it in batches to avoid long locks. Run repeatedly until 0 rows deleted. Flow run states (canonical names; change to your singular names if needed)
Copy code
-- Optional: add indexes to speed joins if missing
-- CREATE INDEX IF NOT EXISTS idx_flow_run_states_flow_run_id ON flow_run_states (flow_run_id);
-- CREATE INDEX IF NOT EXISTS idx_flow_runs_state_id ON flow_runs (state_id);

WITH doomed AS (
  SELECT s.id
  FROM flow_run_states s
  JOIN flow_runs r ON r.id = s.flow_run_id
  JOIN flow_run_states cur ON cur.id = r.state_id
  WHERE r.state_id IS NOT NULL
    AND cur.type IN ('COMPLETED','FAILED','CANCELLED','CRASHED')
    AND s.id <> r.state_id
  LIMIT 50000
)
DELETE FROM flow_run_states
WHERE id IN (SELECT id FROM doomed);
Task run states
Copy code
-- Optional indexes
-- CREATE INDEX IF NOT EXISTS idx_task_run_states_task_run_id ON task_run_states (task_run_id);
-- CREATE INDEX IF NOT EXISTS idx_task_runs_state_id ON task_runs (state_id);

WITH doomed AS (
  SELECT s.id
  FROM task_run_states s
  JOIN task_runs r ON r.id = s.task_run_id
  JOIN task_run_states cur ON cur.id = r.state_id
  WHERE r.state_id IS NOT NULL
    AND cur.type IN ('COMPLETED','FAILED','CANCELLED','CRASHED')
    AND s.id <> r.state_id
  LIMIT 50000
)
DELETE FROM task_run_states
WHERE id IN (SELECT id FROM doomed);
Notes - Replace `flow_run_states`/`task_run_states` with your actual table names (`flow_run_state`/`task_run_state`) if that’s what your schema uses. - You can add an age filter if you want to be extra safe: -
AND s.timestamp < now() - interval '7 days'
- After each batch, re-run until 0 rows deleted. Follow-up maintenance
Copy code
VACUUM (ANALYZE) flow_run_states;
VACUUM (ANALYZE) task_run_states;
-- For significant space reclaim (requires locks/downtime):
-- VACUUM FULL flow_run_states;
-- VACUUM FULL task_run_states;
If downtime is hard, consider pg_repack for online compaction. Quick sanity checks before/after
Copy code
-- Find the worst offenders (runs with the most states)
SELECT flow_run_id, COUNT(*) AS n
FROM flow_run_states
GROUP BY flow_run_id
ORDER BY n DESC
LIMIT 20;

SELECT task_run_id, COUNT(*) AS n
FROM task_run_states
GROUP BY task_run_id
ORDER BY n DESC
LIMIT 20;
Why did states balloon? Averages in the thousands suggest something is repeatedly flipping states. Common causes: - High retry counts with short delays or tasks that keep re-entering RUNNING/SCHEDULED - Processes repeatedly re-attaching to the same runs after restarts (workers crashing/restarting) - Custom code or triggers that set states frequently - Services like “late runs” thrashing on problematic runs If you can share a couple of the top offending run IDs and their state type distribution, we can diagnose:
Copy code
-- Distribution of state types for a specific run
SELECT type, COUNT(*) 
FROM flow_run_states 
WHERE flow_run_id = '<uuid>'
GROUP BY type
ORDER BY COUNT(*) DESC;
To prevent recurrence - Keep a periodic prune job (weekly/daily) that keeps only the current state for terminal runs (queries above without LIMIT). - Review retry policies and ensure they’re not excessively aggressive. - Check for worker restarts/health that could cause repeated transitions.
- Narrow scheduler work temporarily while you cleanup:
Copy code
PREFECT_API_SERVICES_SCHEDULER_RECENT_DEPLOYMENTS_LOOP_SECONDS=30
PREFECT_API_SERVICES_SCHEDULER_MAX_RUNS=50
PREFECT_API_SERVICES_SCHEDULER_DEPLOYMENT_BATCH_SIZE=50
PREFECT_API_SERVICES_SCHEDULER_INSERT_BATCH_SIZE=250
PREFECT_API_SERVICES_SCHEDULER_MIN_SCHEDULED_TIME=PT2H
PREFECT_API_SERVICES_SCHEDULER_MAX_SCHEDULED_TIME=P7D
PREFECT_API_DATABASE_TIMEOUT=30
If you want, paste one example run’s state-type breakdown and your retry config; I’ll help pinpoint the pattern and suggest guardrails so the states table doesn’t balloon again.
s
@Marvin flow run ids top-offenders :
Copy code
dd01ef7c-7ba6-4bce-9fb1-c1612bfe5ff5	655003
d8c7c0cb-2909-4eb0-b3f1-fcbfb6a71079	652791
1c57c018-8154-4f5b-b0d5-c53e325207ff	455405
2fff65a2-647f-425e-910a-386b98648986	422391
4e9a8f4b-53e9-4b0c-890d-7d71a5df079b	422391
bbca4c50-81f9-4b8b-bc74-e3f7c9042335	422390
breakdown of the first one:
Copy code
CANCELLING	655004
SCHEDULED	1
PENDING	1
RUNNING	1
CRASHED	1
results for the second one is the same.. perhaps db-load (which we are about to fix) is what's causing the cancelling state-switches?
m
thought for 119.8 seconds
Whoa — 655k CANCELLING states on a single flow run is the smoking gun. That’s not normal churn; something is repeatedly re-applying CANCELLING. DB load can exacerbate it (the finalization to CANCELLED may never complete), but the core issue is a loop that keeps writing CANCELLING. What I’d do now 1) Force-finalize the top offenders to stop the bleed - For each of those flow run IDs, try to force them to CANCELLED so Prefect stops re-asserting CANCELLING:
Copy code
prefect flow-run cancel dd01ef7c-7ba6-4bce-9fb1-c1612bfe5ff5
prefect flow-run cancel d8c7c0cb-2909-4eb0-b3f1-fcbfb6a71079
...
- Then confirm the current state:
Copy code
SELECT r.id, s.type, s.timestamp
FROM flow_runs r
JOIN flow_run_state s ON s.id = r.state_id
WHERE r.id IN ('dd01ef7c-7ba6-4bce-9fb1-c1612bfe5ff5','d8c7c0cb-2909-4eb0-b3f1-fcbfb6a71079');
2) Prune historical states, keeping only the final state - Use your singular table names. Run in batches until no more rows are deleted. This keeps only the final state for terminal runs.
Copy code
-- Flow runs: keep only the current terminal state
WITH doomed AS (
  SELECT s.id
  FROM flow_run_state s
  JOIN flow_runs r ON r.id = s.flow_run_id
  JOIN flow_run_state cur ON cur.id = r.state_id
  WHERE r.state_id IS NOT NULL
    AND cur.type IN ('COMPLETED','FAILED','CANCELLED','CRASHED')
    AND s.id <> r.state_id
  LIMIT 50000
)
DELETE FROM flow_run_state
WHERE id IN (SELECT id FROM doomed);

-- Task runs: same pattern
WITH doomed AS (
  SELECT s.id
  FROM task_run_state s
  JOIN task_runs r ON r.id = s.task_run_id
  JOIN task_run_state cur ON cur.id = r.state_id
  WHERE r.state_id IS NOT NULL
    AND cur.type IN ('COMPLETED','FAILED','CANCELLED','CRASHED')
    AND s.id <> r.state_id
  LIMIT 50000
)
DELETE FROM task_run_state
WHERE id IN (SELECT id FROM doomed);
- Optional safety: also add
AND s.timestamp < now() - interval '7 days'
if you want to keep the most recent history. - Maintenance after pruning:
Copy code
VACUUM (ANALYZE) flow_run_state;
VACUUM (ANALYZE) task_run_state;
-- For real space reclaim (locks/downtime): VACUUM FULL flow_run_state; VACUUM FULL task_run_state;
3) Identify who/what is re-asserting CANCELLING - Look for repeated cancellation requests in the state messages/details:
Copy code
SELECT message, COUNT(*)
FROM flow_run_state
WHERE flow_run_id = 'dd01ef7c-7ba6-4bce-9fb1-c1612bfe5ff5'
  AND type = 'CANCELLING'
GROUP BY message
ORDER BY COUNT(*) DESC
LIMIT 10;

SELECT state_details, COUNT(*)
FROM flow_run_state
WHERE flow_run_id = 'dd01ef7c-7ba6-4bce-9fb1-c1612bfe5ff5'
  AND type = 'CANCELLING'
GROUP BY state_details
ORDER BY COUNT(*) DESC
LIMIT 10;
- Common culprits: - An external script repeatedly calling the cancel API for the same run - The cancellation cleanup service re-touching runs that never finalize because workers are offline or failing to report CANCELLED - Worker restarts or crashes causing repeated “cancel requested” handling - Check worker logs for these run IDs and the server logs around when CANCELLING spikes. 4) Temporarily reduce/disable the cancellation churn while you clean - If available in your 2.20 build, you can slow or temporarily disable cancellation cleanup to stop the flood while you prune. Look for an env like:
Copy code
PREFECT_API_SERVICES_CANCELLATION_CLEANUP_ENABLED=false
- If that setting isn’t present in your 2.20 image, slow the scheduler loops and raise the DB timeout while you clean:
Copy code
PREFECT_API_SERVICES_SCHEDULER_RECENT_DEPLOYMENTS_LOOP_SECONDS=30
PREFECT_API_SERVICES_SCHEDULER_MAX_RUNS=50
PREFECT_API_SERVICES_SCHEDULER_DEPLOYMENT_BATCH_SIZE=50
PREFECT_API_SERVICES_SCHEDULER_INSERT_BATCH_SIZE=250
PREFECT_API_SERVICES_SCHEDULER_MIN_SCHEDULED_TIME=PT2H
PREFECT_API_SERVICES_SCHEDULER_MAX_SCHEDULED_TIME=P7D
PREFECT_API_DATABASE_TIMEOUT=30
5) Make it observable - Tag Prefect DB connections so you can spot them: - Append
?application_name=prefect-api
to
PREFECT_API_DATABASE_CONNECTION_URL
- If needed briefly, enable SQL echo to correlate behavior:
Copy code
PREFECT_API_DATABASE_ECHO=true
Use sparingly (very verbose). Your numbers confirm the plan - Avg 11k flow states/run and 7.6k task states/run = extreme churn. - Dead tuples near zero → not (just) bloat; these are live rows. Pruning is the right fix. - Once you force-finalize the worst runs, pruning will cut the states table dramatically. Then VACUUM/pg_repack to reclaim space. If you share: - The message/state_details sample from the CANCELLING rows - Whether those runs’ current state is CANCELLING or CANCELLED after
prefect flow-run cancel
I can help pinpoint the source and suggest a guardrail so this doesn’t recur.
s
@Marvin the message is only
The parent flow run was cancelled.
m
thought for 93.2 seconds
That message pinpoints it: these are child runs being repeatedly put into CANCELLING because “The parent flow run was cancelled.” So the cancellation enforcer keeps reasserting CANCELLING on the children and never finalizes them, creating hundreds of thousands of new state rows. DB load can absolutely exacerbate this: if finalization (moving to CANCELLED) can’t complete fast enough, the cleanup/enforcer will keep touching the same runs and append more CANCELLING states. Here’s a focused plan to stop the churn, clean up safely, and prevent recurrence. Immediate containment 1) Force-finalize the worst offenders - Try to move them from CANCELLING to CANCELLED via CLI so the enforcer stops touching them:
Copy code
prefect flow-run cancel dd01ef7c-7ba6-4bce-9fb1-c1612bfe5ff5
prefect flow-run cancel d8c7c0cb-2909-4eb0-b3f1-fcbfb6a71079
prefect flow-run cancel 1c57c018-8154-4f5b-b0d5-c53e325207ff
- Verify:
Copy code
SELECT r.id, s.type, s.timestamp
FROM flow_runs r
JOIN flow_run_state s ON s.id = r.state_id
WHERE r.id IN ('dd01ef7c-7ba6-4bce-9fb1-c1612bfe5ff5','d8c7c0cb-2909-4eb0-b3f1-fcbfb6a71079','1c57c018-8154-4f5b-b0d5-c53e325207ff');
2) Optional: slow cancellation enforcement temporarily - If available in your 2.20 image, you can slow or pause cancellation cleanup while you clean to stop re-assertions. If not present, at least raise
PREFECT_API_DATABASE_TIMEOUT
and reduce scheduler work temporarily as shared earlier to reduce the rate of reassertion. - Ref: Prefect 2 settings Prune state history (Option B) — tuned for your singular table names A) Terminal runs: keep only the current terminal state; delete all older states
Copy code
WITH doomed AS (
  SELECT s.id
  FROM flow_run_state s
  JOIN flow_runs r ON r.id = s.flow_run_id
  JOIN flow_run_state cur ON cur.id = r.state_id
  WHERE r.state_id IS NOT NULL
    AND cur.type IN ('COMPLETED','FAILED','CANCELLED','CRASHED')
    AND s.id <> r.state_id
  LIMIT 50000
)
DELETE FROM flow_run_state
WHERE id IN (SELECT id FROM doomed);
Copy code
WITH doomed AS (
  SELECT s.id
  FROM task_run_state s
  JOIN task_runs r ON r.id = s.task_run_id
  JOIN task_run_state cur ON cur.id = r.state_id
  WHERE r.state_id IS NOT NULL
    AND cur.type IN ('COMPLETED','FAILED','CANCELLED','CRASHED')
    AND s.id <> r.state_id
  LIMIT 50000
)
DELETE FROM task_run_state
WHERE id IN (SELECT id FROM doomed);
B) Runs still stuck in CANCELLING: keep only the latest CANCELLING state - First, ensure flow_runs.state_id points to the latest CANCELLING state for consistency:
Copy code
WITH latest AS (
  SELECT DISTINCT ON (s.flow_run_id) s.id, s.flow_run_id, s.timestamp
  FROM flow_run_state s
  WHERE s.type = 'CANCELLING'
  ORDER BY s.flow_run_id, s.timestamp DESC
)
UPDATE flow_runs r
SET state_id = latest.id
FROM latest
JOIN flow_run_state cur ON cur.id = r.state_id
WHERE r.id = latest.flow_run_id
  AND cur.type = 'CANCELLING'
  AND r.state_id <> latest.id;
- Then delete older CANCELLING rows:
Copy code
WITH latest AS (
  SELECT flow_run_id, MAX(timestamp) AS ts
  FROM flow_run_state
  WHERE type = 'CANCELLING'
  GROUP BY flow_run_id
)
DELETE FROM flow_run_state s
USING latest l
JOIN flow_run_state cur ON cur.id = (SELECT state_id FROM flow_runs WHERE id = l.flow_run_id)
WHERE s.type = 'CANCELLING'
  AND s.flow_run_id = l.flow_run_id
  AND cur.type = 'CANCELLING'
  AND s.timestamp < l.ts
LIMIT 50000;
- Run these in batches until 0 rows deleted. Then:
Copy code
VACUUM (ANALYZE) flow_run_state;
VACUUM (ANALYZE) task_run_state;
-- For disk reclaim (requires locks/downtime): VACUUM FULL flow_run_state; VACUUM FULL task_run_state;
Find the parent and the cascade pattern - For one offender, identify its parent and siblings — verify the parent is CANCELLED and children are the ones churning: ``` -- Find the parent flow run of a child subflow SELECT parent.id AS parent_flow_run_id, child.id AS child_flow_run_id FROM flow_runs child
JOIN task_runs t ON child.parent_task_run_id = t.id JOIN flow_runs parent ON t.flow_run_id = parent.id WHERE child.id = 'dd01ef7c-7ba6-4bce-9fb1-c1612bfe5ff5'; -- Are siblings also stuck cancelling? SELECT child.id, cur.type, cur.timestamp FROM flow_runs child JOIN task_runs t ON child.parent_task_run_id = t.id JOIN flow_run_state cur ON cur.id = child.state_id WHERE t.flow_run_id = '<parent_flow_run_id>' ORDER BY cur.type, cur.timestamp DESC LIMIT 50;
Copy code
- If you see many children in CANCELLING, finalize them (CLI cancel) and then prune.

Prevent recurrence
- Ensure workers/infrastructure actually terminate runs on cancel. If you’re using Docker/K8s/Dask/Ray, verify your cancellation behavior kills child processes; otherwise the server will keep re-requesting cancellation.
- Add run-level timeouts and reasonable retry policies to avoid long-running stuck runs.
- Keep a periodic prune job (daily/weekly) that:
  - For terminal runs: keeps only current state
  - For runs stuck in CANCELLING for > N minutes: keeps only the latest CANCELLING state or proactively finalizes them to CANCELLED
- While you clean up, reduce scheduler pressure and bump DB timeout:
PREFECT_API_SERVICES_SCHEDULER_RECENT_DEPLOYMENTS_LOOP_SECONDS=30 PREFECT_API_SERVICES_SCHEDULER_MAX_RUNS=50 PREFECT_API_SERVICES_SCHEDULER_DEPLOYMENT_BATCH_SIZE=50 PREFECT_API_SERVICES_SCHEDULER_INSERT_BATCH_SIZE=250 PREFECT_API_SERVICES_SCHEDULER_MIN_SCHEDULED_TIME=PT2H PREFECT_API_SERVICES_SCHEDULER_MAX_SCHEDULED_TIME=P7D PREFECT_API_DATABASE_TIMEOUT=30 ``` Refs - Settings: Prefect 2 settings - Related scheduler heaviness report: RRule schedules causing scheduler CPU and long loops If you can share whether those children finalize to CANCELLED after the CLI cancel, we can confirm the loop is broken and then aggressively prune the historical CANCELLING rows.
s
@Marvin i've executed the first
prefect flow-run cancel
but it's still only SCHEDULED for cancellation - and status in UI is still
Cancelling
, so this is not setting the final state for real as you mentioned.
m
thought for 71.5 seconds
You’re right:
prefect flow-run cancel
only sets the run to CANCELLING. Normally, a worker or the server’s cleanup service will finalize it to CANCELLED. Since yours are stuck, the fastest way is to force the final state. Two ways to force-finalize stuck runs to CANCELLED A) Use the REST API with force=true - Adjust for your API URL shape. If your
PREFECT_API_URL
already ends with
/api
, use it directly; otherwise add
/api
. If you use auth, add the Authorization header.
Copy code
curl -X POST "$PREFECT_API_URL/flow_runs/dd01ef7c-7ba6-4bce-9fb1-c1612bfe5ff5/set_state?force=true" \
  -H "Content-Type: application/json" \
  -d '{"state": {"type":"CANCELLED","name":"Cancelled","message":"Forced cancel"}}'
- Repeat for each offender. Then verify:
Copy code
SELECT r.id, s.type, s.timestamp
FROM flow_runs r
JOIN flow_run_state s ON s.id = r.state_id
WHERE r.id IN ('dd01ef7c-7ba6-4bce-9fb1-c1612bfe5ff5','d8c7c0cb-2909-4eb0-b3f1-fcbfb6a71079');
B) Use a tiny Python script (Prefect 2.x)
Copy code
import asyncio
from prefect.client.orchestration import PrefectClient
from prefect.states import Cancelled

RUN_IDS = [
    "dd01ef7c-7ba6-4bce-9fb1-c1612bfe5ff5",
    "d8c7c0cb-2909-4eb0-b3f1-fcbfb6a71079",
    # ...
]

async def main():
    async with PrefectClient() as client:
        for rid in RUN_IDS:
            state = Cancelled(name="Forced cancel")
            await client.set_flow_run_state(rid, state=state, force=True)
            print("Forced CANCELLED:", rid)

asyncio.run(main())
- This bypasses orchestration guardrails and sets the final state. Then prune the storm of CANCELLING states (Option B) - Keep only the current state for terminal runs and delete older history. Since your schema uses singular names, use these:
Copy code
WITH doomed AS (
  SELECT s.id
  FROM flow_run_state s
  JOIN flow_runs r ON r.id = s.flow_run_id
  JOIN flow_run_state cur ON cur.id = r.state_id
  WHERE r.state_id IS NOT NULL
    AND cur.type IN ('COMPLETED','FAILED','CANCELLED','CRASHED')
    AND s.id <> r.state_id
  LIMIT 50000
)
DELETE FROM flow_run_state
WHERE id IN (SELECT id FROM doomed);
Copy code
WITH doomed AS (
  SELECT s.id
  FROM task_run_state s
  JOIN task_runs r ON r.id = s.task_run_id
  JOIN task_run_state cur ON cur.id = r.state_id
  WHERE r.state_id IS NOT NULL
    AND cur.type IN ('COMPLETED','FAILED','CANCELLED','CRASHED')
    AND s.id <> r.state_id
  LIMIT 50000
)
DELETE FROM task_run_state
WHERE id IN (SELECT id FROM doomed);
- Run in batches until 0 rows; then:
Copy code
VACUUM (ANALYZE) flow_run_state;
VACUUM (ANALYZE) task_run_state;
-- For real disk reclaim (locks/downtime):
-- VACUUM FULL flow_run_state; VACUUM FULL task_run_state;
If a run is still in CANCELLING after the forced API call - Confirm the HTTP response was 200 and that you used
force=true
. - Make sure you hit the correct base URL (some setups have
PREFECT_API_URL
already including
/api
). - If it still won’t stick, we can do a surgical DB remediation: insert a new CANCELLED state row and point
flow_runs.state_id
to it. I can give you that SQL if needed; the API method is preferred. Reduce the chance of recurrence - These children are getting repeatedly set to CANCELLING because “The parent flow run was cancelled.” Once you force them to CANCELLED, the loop should stop. - Ensure cancellations actually terminate child work on your infra (Docker/K8s/etc.); otherwise the server keeps reasserting CANCELLING. - Consider a periodic job that: - For terminal runs, keeps only the current state (queries above without LIMIT) - For runs stuck in CANCELLING for > N minutes, forcibly sets CANCELLED via API If you want, share the exact API URL you’re using and the response you get from the force call; I can sanity-check the endpoint and payload.
s
@Marvin
Copy code
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/Users/srul.pinkas/.pyenv/versions/3.9.8/lib/python3.9/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/Users/srul.pinkas/.pyenv/versions/3.9.8/lib/python3.9/asyncio/base_events.py", line 642, in run_until_complete
    return future.result()
  File "<stdin>", line 2, in main
TypeError: __init__() missing 1 required positional argument: 'api'
m
thought for 59.5 seconds
That error is because in Prefect 2.x,
PrefectClient
requires an
api
argument. The easiest fix is to use
get_client()
which reads
PREFECT_API_URL
from your env. Try one of these two options. Option A: use get_client() (recommended)
Copy code
import asyncio
from prefect.client.orchestration import get_client
from prefect.states import Cancelled

RUN_IDS = [
    "dd01ef7c-7ba6-4bce-9fb1-c1612bfe5ff5",
    "d8c7c0cb-2909-4eb0-b3f1-fcbfb6a71079",
    # ...
]

async def main():
    async with get_client() as client:  # picks up PREFECT_API_URL
        for rid in RUN_IDS:
            state = Cancelled(name="Forced cancel")
            resp = await client.set_flow_run_state(rid, state=state, force=True)
            print("Forced CANCELLED:", rid, getattr(resp, "status", resp))

asyncio.run(main())
Option B: pass the API URL explicitly
Copy code
import os, asyncio
from prefect.client.orchestration import PrefectClient
from prefect.states import Cancelled

API = os.environ["PREFECT_API_URL"]  # e.g. http://<host>:4200/api
RUN_IDS = ["dd01ef7c-7ba6-4bce-9fb1-c1612bfe5ff5", "d8c7c0cb-2909-4eb0-b3f1-fcbfb6a71079"]

async def main():
    async with PrefectClient(api=API) as client:
        for rid in RUN_IDS:
            state = Cancelled(name="Forced cancel")
            resp = await client.set_flow_run_state(rid, state=state, force=True)
            print("Forced CANCELLED:", rid, getattr(resp, "status", resp))

asyncio.run(main())
If you prefer cURL, you can force via the REST API as well (adjust base URL and auth as needed):
Copy code
curl -X POST "$PREFECT_API_URL/flow_runs/dd01ef7c-7ba6-4bce-9fb1-c1612bfe5ff5/set_state?force=true" \
  -H "Content-Type: application/json" \
  -d '{"state": {"type":"CANCELLED","name":"Cancelled","message":"Forced cancel"}}'
If the forced set still doesn’t stick or returns a rejection, tell me the response you get and we’ll either adjust the request or do a surgical DB fix: - Insert a CANCELLED state row, then point the run to it:
Copy code
-- Replace with your singular table names if needed
WITH new_state AS (
  INSERT INTO flow_run_state (flow_run_id, type, name, message, timestamp)
  VALUES ('dd01ef7c-7ba6-4bce-9fb1-c1612bfe5ff5', 'CANCELLED', 'Cancelled', 'Forced cancel', now())
  RETURNING id
)
UPDATE flow_runs r
SET state_id = new_state.id
FROM new_state
WHERE r.id = 'dd01ef7c-7ba6-4bce-9fb1-c1612bfe5ff5';
Then run your pruning queries to drop the hundreds of thousands of old CANCELLING rows and VACUUM. Once you confirm the forced CANCELLED worked, I’d: - Prune historical states (keep only the current state for terminal runs) - Consider a small periodic job to do that pruning going forward - Double-check your infra cancellation actually terminates child work, so the server doesn’t keep reasserting CANCELLING on children of a cancelled parent If anything in the snippets above throws or returns a REJECT/WAI status, paste it here and I’ll adjust the approach.
s
@Marvin worked! i see the status in UI is
Forced Cancel
(note that this is not CANCELLED. should we changed anything in our scripts /sqls?). I ran it for all >200k top-offenders (30 runs). i also have about 100 runs with 10k n (states-count). then the rest are single-digits. is this also super-weird (note that i have sometimes 50 sub-flows in my runs. and also sometimes i want about a minute for a machine to be up - so that's a few states i always get) and i should fix them? or only the 200k and up which i did?.
m
thought for 32.8 seconds
Nice — glad the force worked. About “Forced Cancel” vs CANCELLED - Prefect states have a type and a display name. You set a Cancelled state with name “Forced cancel”, so: - type = CANCELLED - name = Forced cancel (what the UI shows) - You do not need to change anything if your scripts/SQL filter by state type. If you’re checking the name, switch to type. - Good pattern:
WHERE type IN ('COMPLETED','FAILED','CANCELLED','CRASHED')
Should you fix the ~100 runs with ~10k states? - 10k states/run is still very high. It’s worth investigating, but you don’t have to stop everything for them right now. - First, prune history so they don’t waste space: - The “keep only the current state for terminal runs” query will clean these too if they’re terminal. - For non-terminal runs stuck in CANCELLING, keep only the latest CANCELLING state. Here are the cleanup queries using your singular table names. Prune terminal runs (keep only current terminal state)
Copy code
WITH doomed AS (
  SELECT s.id
  FROM flow_run_state s
  JOIN flow_runs r ON r.id = s.flow_run_id
  JOIN flow_run_state cur ON cur.id = r.state_id
  WHERE r.state_id IS NOT NULL
    AND cur.type IN ('COMPLETED','FAILED','CANCELLED','CRASHED')
    AND s.id <> r.state_id
  LIMIT 50000
)
DELETE FROM flow_run_state
WHERE id IN (SELECT id FROM doomed);
Copy code
WITH doomed AS (
  SELECT s.id
  FROM task_run_state s
  JOIN task_runs r ON r.id = s.task_run_id
  JOIN task_run_state cur ON cur.id = r.state_id
  WHERE r.state_id IS NOT NULL
    AND cur.type IN ('COMPLETED','FAILED','CANCELLED','CRASHED')
    AND s.id <> r.state_id
  LIMIT 50000
)
DELETE FROM task_run_state
WHERE id IN (SELECT id FROM doomed);
Run repeatedly until 0 rows deleted; then:
Copy code
VACUUM (ANALYZE) flow_run_state;
VACUUM (ANALYZE) task_run_state;
For runs still in CANCELLING: keep only the latest CANCELLING state
Copy code
-- Ensure flow_runs.state_id points to the latest CANCELLING
WITH latest AS (
  SELECT DISTINCT ON (s.flow_run_id) s.id, s.flow_run_id, s.timestamp
  FROM flow_run_state s
  WHERE s.type = 'CANCELLING'
  ORDER BY s.flow_run_id, s.timestamp DESC
)
UPDATE flow_runs r
SET state_id = latest.id
FROM latest
JOIN flow_run_state cur ON cur.id = r.state_id
WHERE r.id = latest.flow_run_id
  AND cur.type = 'CANCELLING'
  AND r.state_id <> latest.id;

-- Delete older CANCELLING rows
WITH latest AS (
  SELECT flow_run_id, MAX(timestamp) AS ts
  FROM flow_run_state
  WHERE type = 'CANCELLING'
  GROUP BY flow_run_id
)
DELETE FROM flow_run_state s
USING latest l
JOIN flow_run_state cur ON cur.id = (SELECT state_id FROM flow_runs WHERE id = l.flow_run_id)
WHERE s.type = 'CANCELLING'
  AND s.flow_run_id = l.flow_run_id
  AND cur.type = 'CANCELLING'
  AND s.timestamp < l.ts
LIMIT 50000;
Diagnose why those 10k runs ballooned - Check the state-type distribution on a few of them. If it’s mostly CANCELLING/RETRYING/RUNNING flips, there’s a behavioral loop to address.
Copy code
SELECT type, COUNT(*)
FROM flow_run_state
WHERE flow_run_id = '<run_id_with_10k_states>'
GROUP BY type
ORDER BY COUNT(*) DESC;
- Common causes: - Parent cancellation cascading to many children that never finalize, repeatedly reasserting CANCELLING - Very aggressive retries or short retry delays causing rapid churn - Infrastructure not actually terminating work on cancel (server keeps re-requesting cancel) - Automation repeatedly calling cancel on the same runs Prevention - Keep the pruning job as a periodic maintenance task. - Ensure cancellations on your infra truly terminate processes for subflows/tasks. - Tame retry policies and add sensible timeouts to flows/tasks to avoid long “stuck” runs. Once your cleanup is done and things stabilize, you can dial back any temporary scheduler/DB tweaks you set earlier. If you want, share one 10k run’s state distribution and I’ll help pinpoint the cause and suggest exact guardrails.
s
@Marvin the deletion takes forever. 50k takes about a minute (tried to raise to 300k in a single deletion, and it took 10min). probably it's the big temp-table that is blocking it.. i think i have ~4M records (!) to deleted, so windows of 100k is quite a lot of queries (tried to count it but it also took minutes so i dropped the query). can i do it in some kind of loop?
m
thought for 76.5 seconds
You’re right: big one-shot deletes get slower as the planner builds large temp sets and you hold long locks. Two things will help a lot: - Make the delete selection cheap (use indexed joins and avoid large materializations) - Commit between batches Prep: add the cheap join indexes if they’re missing
Copy code
-- For flow states
CREATE INDEX IF NOT EXISTS idx_flow_run_state_flow_run_id ON flow_run_state (flow_run_id);
CREATE INDEX IF NOT EXISTS idx_flow_runs_state_id ON flow_runs (state_id);

-- For task states
CREATE INDEX IF NOT EXISTS idx_task_run_state_task_run_id ON task_run_state (task_run_id);
CREATE INDEX IF NOT EXISTS idx_task_runs_state_id ON task_runs (state_id);

ANALYZE flow_run_state; ANALYZE flow_runs;
ANALYZE task_run_state; ANALYZE task_runs;
Fast chunked delete pattern (no CTE, use ctid and commit each batch) - This pattern avoids materializing a huge CTE and lets you run many small commits. Run these repeatedly until 0 rows are deleted each time. Adjust chunk size to what your DB tolerates (start 20k–50k). Flow run states (keep only current state for terminal runs)
Copy code
DELETE FROM flow_run_state s
USING (
  SELECT s.ctid
  FROM flow_run_state s
  JOIN flow_runs r ON r.id = s.flow_run_id
  JOIN flow_run_state cur ON cur.id = r.state_id
  WHERE r.state_id IS NOT NULL
    AND cur.type IN ('COMPLETED','FAILED','CANCELLED','CRASHED')
    AND s.id <> r.state_id
  LIMIT 50000
) d
WHERE s.ctid = d.ctid;
Task run states
Copy code
DELETE FROM task_run_state s
USING (
  SELECT s.ctid
  FROM task_run_state s
  JOIN task_runs r ON r.id = s.task_run_id
  JOIN task_run_state cur ON cur.id = r.state_id
  WHERE r.state_id IS NOT NULL
    AND cur.type IN ('COMPLETED','FAILED','CANCELLED','CRASHED')
    AND s.id <> r.state_id
  LIMIT 50000
) d
WHERE s.ctid = d.ctid;
How to loop with commits - Easiest is a tiny client script that repeats the delete until it returns 0 rows, committing each batch. Example with psycopg (commit per batch)
Copy code
import os, time, psycopg2

dsn = os.environ["PREFECT_DB_DSN"]  # e.g. <postgresql://user:pass@host:5432/db>
chunk = 50000

delete_flow_sql = f"""
DELETE FROM flow_run_state s
USING (
  SELECT s.ctid
  FROM flow_run_state s
  JOIN flow_runs r ON r.id = s.flow_run_id
  JOIN flow_run_state cur ON cur.id = r.state_id
  WHERE r.state_id IS NOT NULL
    AND cur.type IN ('COMPLETED','FAILED','CANCELLED','CRASHED')
    AND s.id <> r.state_id
  LIMIT {chunk}
) d
WHERE s.ctid = d.ctid;
"""

delete_task_sql = f"""
DELETE FROM task_run_state s
USING (
  SELECT s.ctid
  FROM task_run_state s
  JOIN task_runs r ON r.id = s.task_run_id
  JOIN task_run_state cur ON cur.id = r.state_id
  WHERE r.state_id IS NOT NULL
    AND cur.type IN ('COMPLETED','FAILED','CANCELLED','CRASHED')
    AND s.id <> r.state_id
  LIMIT {chunk}
) d
WHERE s.ctid = d.ctid;
"""

with psycopg2.connect(dsn) as conn:
    conn.autocommit = False
    with conn.cursor() as cur:
        while True:
            cur.execute(delete_flow_sql)
            deleted = cur.rowcount
            conn.commit()
            print("flow_run_state deleted:", deleted)
            if deleted == 0:
                break
            time.sleep(0.2)

        while True:
            cur.execute(delete_task_sql)
            deleted = cur.rowcount
            conn.commit()
            print("task_run_state deleted:", deleted)
            if deleted == 0:
                break
            time.sleep(0.2)
Alternative: temp table of terminal runs for a faster join - If you have many non-terminal runs, precomputing terminal runs can make deletes cheaper: ``` CREATE TEMP TABLE term_flow_runs AS SELECT r.id AS flow_run_id, r.state_id AS keep_state_id FROM flow_runs r JOIN flow_run_state cur ON cur.id = r.state_id WHERE cur.type IN ('COMPLETED','FAILED','CANCELLED','CRASHED'); CREATE INDEX ON term_flow_runs (flow_run_id); CREATE INDEX ON term_flow_runs (keep_state_id); ANALYZE term_flow_runs; -- Then chunked delete becomes: DELETE FROM flow_run_state s USING (
SELECT s.ctid FROM flow_run_state s JOIN term_flow_runs t ON t.flow_run_id = s.flow_run_id WHERE s.id <> t.keep_state_id LIMIT 50000 ) d WHERE s.ctid = d.ctid;
Copy code
- Do the same for task runs if needed.

Session tweaks while deleting
- Run during off-peak if you can.
- Consider temporarily raising statement_timeout for your session or disabling it:
SET LOCAL statement_timeout = '0'; -- no timeout for the maintenance session SET LOCAL lock_timeout = '2s'; -- don’t wait forever on conflicting locks SET LOCAL application_name = 'prefect-prune';
Copy code
After the big delete
VACUUM (ANALYZE) flow_run_state; VACUUM (ANALYZE) task_run_state; -- To reclaim disk (requires locks/downtime): VACUUM FULL flow_run_state; VACUUM FULL task_run_state; -- Or use pg_repack for online compaction if available ``` How long will ~4M rows take? - If you’re getting ~50k/min with small locks/commits, you’re looking at around 80 minutes for 4M — that’s reasonable. Larger batches can backfire due to memory and longer locks. The chunk+commit approach typically gives the best wall-clock with the least impact on other workload. If you hit any lock waits or see the delete still scanning too much, share EXPLAIN for the chunked delete and we’ll tune indexes further.
s
@Marvin is
ctid
or
id
the field i should use? the python and sql snippets are different
m
thought for 38.0 seconds
Great question — both patterns work, but they serve different purposes: - Using id: stable, simple, relies on an index on id. Good if your join is selective and the PK/index is efficient. - Using ctid: Postgres-only “row pointer” that’s very fast for chunked deletes because we select the exact rows to delete and match them by ctid in the same statement. Don’t store ctids; only use them inside the same statement. Given your volume, use the ctid pattern for speed. Here are consistent, singular-table snippets you can run in a loop (commit between batches). Prep (once)
Copy code
CREATE INDEX IF NOT EXISTS idx_flow_run_state_flow_run_id ON flow_run_state (flow_run_id);
CREATE INDEX IF NOT EXISTS idx_flow_runs_state_id ON flow_runs (state_id);

CREATE INDEX IF NOT EXISTS idx_task_run_state_task_run_id ON task_run_state (task_run_id);
CREATE INDEX IF NOT EXISTS idx_task_runs_state_id ON task_runs (state_id);

ANALYZE flow_run_state; ANALYZE flow_runs;
ANALYZE task_run_state; ANALYZE task_runs;
Chunked delete using ctid (terminal runs: keep only current state)
Copy code
DELETE FROM flow_run_state s
USING (
  SELECT s.ctid
  FROM flow_run_state s
  JOIN flow_runs r ON r.id = s.flow_run_id
  JOIN flow_run_state cur ON cur.id = r.state_id
  WHERE r.state_id IS NOT NULL
    AND cur.type IN ('COMPLETED','FAILED','CANCELLED','CRASHED')
    AND s.id <> r.state_id
  LIMIT 50000
) pick
WHERE s.ctid = pick.ctid;
Copy code
DELETE FROM task_run_state s
USING (
  SELECT s.ctid
  FROM task_run_state s
  JOIN task_runs r ON r.id = s.task_run_id
  JOIN task_run_state cur ON cur.id = r.state_id
  WHERE r.state_id IS NOT NULL
    AND cur.type IN ('COMPLETED','FAILED','CANCELLED','CRASHED')
    AND s.id <> r.state_id
  LIMIT 50000
) pick
WHERE s.ctid = pick.ctid;
If you prefer the id variant (slightly simpler, may be a bit slower):
Copy code
DELETE FROM flow_run_state s
USING (
  SELECT s.id
  FROM flow_run_state s
  JOIN flow_runs r ON r.id = s.flow_run_id
  JOIN flow_run_state cur ON cur.id = r.state_id
  WHERE r.state_id IS NOT NULL
    AND cur.type IN ('COMPLETED','FAILED','CANCELLED','CRASHED')
    AND s.id <> r.state_id
  LIMIT 50000
) pick
WHERE s.id = pick.id;
Pure SQL loop (runs on the server; stops when a batch deletes 0 rows)
Copy code
DO $$
DECLARE
  r BIGINT;
BEGIN
  LOOP
    DELETE FROM flow_run_state s
    USING (
      SELECT s.ctid
      FROM flow_run_state s
      JOIN flow_runs r ON r.id = s.flow_run_id
      JOIN flow_run_state cur ON cur.id = r.state_id
      WHERE r.state_id IS NOT NULL
        AND cur.type IN ('COMPLETED','FAILED','CANCELLED','CRASHED')
        AND s.id <> r.state_id
      LIMIT 50000
    ) pick
    WHERE s.ctid = pick.ctid;

    GET DIAGNOSTICS r = ROW_COUNT;
    RAISE NOTICE 'flow_run_state deleted %', r;
    IF r = 0 THEN EXIT; END IF;
  END LOOP;
END $$;
Session tips while running batches
Copy code
SET LOCAL application_name = 'prefect-prune';
SET LOCAL lock_timeout = '2s';
SET LOCAL statement_timeout = '0';  -- just for this maintenance session
After you finish
Copy code
VACUUM (ANALYZE) flow_run_state;
VACUUM (ANALYZE) task_run_state;
-- To reclaim disk (locks/downtime):
-- VACUUM FULL flow_run_state; VACUUM FULL task_run_state;
TL;DR: Use the ctid-based chunked delete inside a loop (or the DO $$ loop above). It’s normal that each 50k batch takes ~1 minute on large tables; the loop with commits will still finish faster overall and with less lock impact than giant single-shot deletes. Adjust the LIMIT up/down to hit your sweet spot.
s
@Marvin i did it all, deleted over 90% of the records and did vacum - but according to DBeaver i still see 9GB next to the tables. am i doing something wrong?
m
thought for 51.8 seconds
You didn’t do anything wrong. VACUUM (even with ANALYZE) does not shrink on-disk files; it only marks space for reuse inside the table. That’s why DBeaver still shows ~9 GB. To actually reduce the table file size you have to rewrite the table. What to do next 1) Verify where the size is (heap vs indexes vs toast)
Copy code
SELECT relname,
  pg_size_pretty(pg_relation_size(oid))    AS table_size,
  pg_size_pretty(pg_indexes_size(oid))     AS index_size,
  pg_size_pretty(pg_total_relation_size(oid)) AS total_size
FROM pg_class
WHERE relname IN ('flow_run_state','task_run_state');
2) If you need the physical files to shrink: - Option A (simple, requires downtime/locks): VACUUM FULL rewrites and shrinks
Copy code
VACUUM FULL flow_run_state;
VACUUM FULL task_run_state;
This takes an exclusive lock per table. Do it in a maintenance window (stop Prefect API/services and workers while it runs). - Option B (online): pg_repack - Rewrites tables and indexes online with minimal blocking. Requires installing/using pg_repack client and extension. - Docs: https://reorg.github.io/pg_repack/ - Good when you can’t take locks but want disk back. - Option C (manual rewrite): create a new table, copy rows, swap, drop old - More operational work (constraints, indexes, FKs), but also shrinks files. 3) Don’t forget indexes - Indexes can be a big chunk of that 9 GB. If you don’t do VACUUM FULL or pg_repack, you can at least shrink indexes:
Copy code
REINDEX TABLE CONCURRENTLY flow_run_state;
REINDEX TABLE CONCURRENTLY task_run_state;
Important note for RDS costs - Even if you shrink table files, RDS allocated storage generally cannot be decreased in-place. You’ll reclaim internal free space (future growth uses it), but your storage bill doesn’t drop unless you snapshot/restore into a smaller volume. AWS docs: https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/CHAP_Storage.html Bottom line - Your deletes + VACUUM were correct for performance. - To see DBeaver’s size drop, run VACUUM FULL (with a lock/maintenance window) or use pg_repack. - If the goal is to lower AWS storage cost, plan a snapshot/restore to a smaller volume after compaction, since RDS doesn’t shrink allocated storage in place. If you can schedule a window, I’d: - Stop Prefect API/services and workers - VACUUM FULL the two states tables (and optionally REINDEX others if needed) - Start services back up I’m happy to help plan the exact maintenance steps for your deployment (K8s/Docker/etc.).
s
@Marvin Two separate questions (still, self-hosted open-source prefect 2.20) • slack notifications: i use the built-in slack (token based) notification to a dedicated channel. it works well, except for the link - i get (with nice bullets and fonts) something like::
Copy code
Prefect flow run notification
Flow run xxx_flow/pi-sothis entered state Completed at 2025-10-12T13:09:43.553961+00:00.
Flow ID: 5653c626-455b-466a-944b-c8e72959e2da
Flow run ID: 388f3b8d-19f4-4ae6-bfb8-546b74869476
Flow run URL: <http://localhost:4200/flow-runs/flow-run/388f3b8d-19f4-4ae6-bfb8-546b74869476>
State message: All states completed.
but the localhost-link is really annoying (not clickable of course). i think i asked you this before - is there an easy env-parameter i can change in order to get the actual domain-URL so that the links would work? is this an k8s inner-OS thing i can't control? i'll type the second (unrelated) question in the next msg while you think
Well @Marvin question 2: we're trying to remove deployments in a job-clean-up (we create many deployments - for every branch, so we can try out runs even before merge, so we get lots of spammy deployments). the job is written as a gitlab-ci job, that interacts with prefect through Python API. the api works well locally, but for some weird way - the gitlab-ci run only gets two first deployments and that's it. so i guess it's not permissions / API mis-use / time-performance. Have you heard of such a weird behavior? anything i can do to debug this?
@Marvin you're alive?
m
thought for 7.1 seconds
I’m here and ready! How can I help with your Prefect workflows today?
s
re-pasting the two questions from yesterday @Marvin : Two separate questions (still, self-hosted open-source prefect 2.20) QUESTION 1: • slack notifications: i use the built-in slack (token based) notification to a dedicated channel. it works well, except for the link - i get (with nice bullets and fonts) something like::
Copy code
Prefect flow run notification
Flow run xxx_flow/pi-sothis entered state Completed at 2025-10-12T13:09:43.553961+00:00.
Flow ID: 5653c626-455b-466a-944b-c8e72959e2da
Flow run ID: 388f3b8d-19f4-4ae6-bfb8-546b74869476
Flow run URL: <http://localhost:4200/flow-runs/flow-run/388f3b8d-19f4-4ae6-bfb8-546b74869476>
State message: All states completed.
but the localhost-link is really annoying (not clickable of course). i think i asked you this before - is there an easy env-parameter i can change in order to get the actual domain-URL so that the links would work? is this an k8s inner-OS thing i can't control? QUESTION 2: we're trying to remove deployments in a job-clean-up (we create many deployments - for every branch, so we can try out runs even before merge, so we get lots of spammy deployments). the job is written as a gitlab-ci job, that interacts with prefect through Python API. the api works well locally, but for some weird way - the gitlab-ci run only gets two first deployments and that's it. so i guess it's not permissions / API mis-use / time-performance. Have you heard of such a weird behavior? anything i can do to debug this?
m
thought for 165.2 seconds
Absolutely — and thanks for the details. I’ll address both for Prefect OSS 2.20. QUESTION 1 (Slack links show http://localhost:4200) This is controlled by Prefect’s UI base URL setting. Set
PREFECT_UI_URL
to your externally reachable domain (and include any path prefix if you’re serving Prefect under a subpath). Example values: -
<https://prefect.mycompany.com>
-
<https://prefect.mycompany.com/prefect>
(if you reverse proxy under /prefect) Make sure this env var is set in the Prefect API/Server pod/container (the component that emits notifications), not just your local machine. Once set, new notifications will include correct, clickable URLs. It’s not a K8s limitation — it’s a Prefect config defaulting to localhost when unset. Helpful docs: - Prefect 2 settings guide - Self-hosting Prefect 2 Quick examples: - Kubernetes (Deployment env):
Copy code
- name: PREFECT_UI_URL
  value: "<https://prefect.mycompany.com>"
- Docker Compose:
Copy code
environment:
  - PREFECT_UI_URL=<https://prefect.mycompany.com>
QUESTION 2 (CI job only “gets two” deployments) I haven’t seen a deterministic “2 only” bug, but a few common culprits explain this behavior: - Pagination not handled: Prefect’s deployments list is paginated. If your code doesn’t paginate explicitly, you’ll only get the first page (the default
limit
can vary by method/environment). - Different API target in CI:
PREFECT_API_URL
might be pointing to a different server/workspace in CI than locally. - Filtering differences: Filters (name/prefix/tags) can narrow to just a couple items without being obvious. - Timeout/partial iteration issues. To make the cleanup robust, explicitly paginate and/or request a large page size:
Copy code
import anyio
from prefect.client.orchestration import PrefectClient

async def list_all_deployments():
    async with PrefectClient() as client:
        all_deps = []
        page_size = 200
        offset = 0
        while True:
            deps = await client.read_deployments(limit=page_size, offset=offset)
            if not deps:
                break
            all_deps.extend(deps)
            offset += len(deps)
        return all_deps

async def delete_branch_deployments(prefix: str):
    deps = await list_all_deployments()
    async with PrefectClient() as client:
        for d in deps:
            if d.name.startswith(prefix):
                await client.delete_deployment(d.id)

anyio.run(delete_branch_deployments, "your-branch-prefix-")
Debug tips: - Print the count you receive in CI vs local to confirm pagination differences. - Ensure
PREFECT_API_URL
in CI points to the same server you expect. - Turn on debug logs in CI to see the requests:
Copy code
export PREFECT_LOGGING_LEVEL=DEBUG
export PREFECT_LOGGING_EXTRA_LOGGERS=httpx
- Hit the REST endpoint directly to verify pagination: - POST to
/api/deployments/filter
with
{"limit": 200, "offset": 0}
and compare total rows returned. - API reference: Prefect 2 REST API If you can share: - The snippet you use to list/delete deployments - The expected number of deployments vs what you see in CI - The value of
PREFECT_API_URL
in CI …I can help pinpoint whether it’s pagination, filtering, or environment targeting.
101 Views