Hi everybody. We're using Prefect to run continuou...
# prefect-community
d
Hi everybody. We're using Prefect to run continuous processing on IoT data. I've just had to clear out the Prefect database as it grew to 12GB in less than a month, stalled the VM it was on, and I had no choice but to scrap it. I am logging a LOT of output from the continuous tasks, so my guess is that it's to blame for the growth, and can make sure once I have it running for a few days. The volume is a few log lines every 5 seconds from each of 5 continuous tasks. Regarding flow deletion/cleanup, I've found a 2 month old thread here https://prefect-community.slack.com/archives/CL09KU1K7/p1598535130019500 that mentions running a mutation on the Hasura API for doing deletions with a time filter, and here the suggested issue: https://github.com/PrefectHQ/server/issues/62 - @Dylan and @Sandeep Aggarwal you have participated in that thread. I have little experience with running GraphQL mutations and no experience with using the Hasura API, so I'm wondering if somebody has a working example of what to do here. Searches in Github and Slack for "clean"/"cleanup"/"database size" all give no results, so I'm hoping we can have a thread here with a recipe on what to do. I have no problem doing the legwork myself, for example, implement a Prefect flow that will make the needed API calls to cleanup old tasks. Any suggestions would be much appreciated.
d
Hey @Dennis Schneidermann, Welcome to the Community!
😁 1
I’m not sure about the status of 62, but I will check in with the team
d
Thanks Dylan
d
In the meantime, if you’d like to write a flow, you can do the following: 1) Decide on how long you’d like to keep your Flow Runs. Let’s say 30 days. I’d make this a default parameter. 2) Get all of the Flow Runs older than a certain date using the graphql api on your Prefect Server instance and return a list of
flow_run_id
s 3) Map over the returned list and use the
delete_flow_run
mutation to delete them
The first time you run this it may take a while, but if you subsequently ran it every day it should be quick depending on your flow run volume
The Prefect Client has a
.graphql
method that accepts a GraphQL query or mutation
Try getting your queries down in the interactive API first
d
Perfect, seems like all the tooling is already there to make it happen 🙂 Will work on it tomorrow as it's end of the day here 👍
d
Awesome! Let us know how it goes
And if you’d consider making your flow open source, I’m sure the community would appreciate it!
s
My flow looks exactly as @Dylan suggested. Just putting it here if it helps @Dennis Schneidermann
Copy code
import pendulum
import prefect
from dynaconf import settings
from prefect import schedules
from prefect.schedules import clocks

# Cron schedule to execute cleanup job. Currently set to run at 00:00 (UTC)
# every Sunday.
DATA_CLEANUP_SCHEDULE_CRON_STRING = "0 0 * * 0"


@prefect.task
def get_expired_flow_runs():
    """Returns list of flow run ID that are expired as per retention policy."""

    logger = prefect.context.get("logger")

    query = {
        "query($updated_before: timestamptz)": {
            """
            flow_run(where: {
                _and: {
                    state: {_neq: "Scheduled"},
                    updated: {_lt: $updated_before}
                }
            })
            """: {
                "id"
            }
        }
    }

    retention_period = (
        pendulum.now("UTC")
        .subtract(days=settings.RETENTION_DAYS)
        .start_of("day")
        .to_datetime_string()
    )

    client = prefect.Client(api_server=settings.PREFECT_API_ENDPOINT)
    flow_runs = client.graphql(
        query, variables={"updated_before": retention_period}
    ).data.flow_run

    <http://logger.info|logger.info>("Expired flow run count: %s", len(flow_runs))

    expired_flow_run_ids = [flow_run.id for flow_run in flow_runs]
    logger.debug("Expired flow run ids: %s", expired_flow_run_ids)
    return expired_flow_run_ids


@prefect.task
def delete_flow_run(flow_run_id):
    """
    Hits a mutation using Prefect client to delete flow run associated with
    given ID.
    As per confirmation in this slack thread:
    <https://prefect-community.slack.com/?redir=%2Farchives%2FCL09KU1K7%2Fp1598535130019500>,
    it should be sufficient to delete just the flow run and database cascades
    should take care of all the related objects.

    Args:
        flow_run_id: ID of flow run to be deleted.
    """
    logger = prefect.context.get("logger")

    mutation = """
        mutation($input: delete_flow_run_input!) {
            delete_flow_run(input: $input) {
                success
            }
        }
    """

    client = prefect.Client(api_server=settings.PREFECT_API_ENDPOINT)
    response = client.graphql(
        mutation, variables=dict(input=dict(flow_run_id=flow_run_id))
    )

    logger.debug("Flow id: %s deletion success: %s", flow_run_id, response.data)

with prefect.Flow(
    "cleanup_expired_data",
    schedule=schedules.Schedule(
        clocks=[clocks.CronClock(DATA_CLEANUP_SCHEDULE_CRON_STRING)]
    ),
) as cleanup_expired_data:
    # Prefect API currently doesn't support bulk deletion of flow runs. So, for
    # now we do it one by one until below bug is resolved.
    # @see: <https://github.com/PrefectHQ/server/issues/62>
    expired_flow_runs = get_expired_flow_runs()
    delete_flow_run.map(expired_flow_runs)
marvin 2
🙌 2
💯 4
d
Very appreciated @Sandeep Aggarwal, thanks 🙂
👍 1
a
@*Sandeep Aggarwal my hero*
😀 1