Dennis Schneidermann

    Dennis Schneidermann

    1 year ago
    Hi everybody. We're using Prefect to run continuous processing on IoT data. I've just had to clear out the Prefect database as it grew to 12GB in less than a month, stalled the VM it was on, and I had no choice but to scrap it. I am logging a LOT of output from the continuous tasks, so my guess is that it's to blame for the growth, and can make sure once I have it running for a few days. The volume is a few log lines every 5 seconds from each of 5 continuous tasks. Regarding flow deletion/cleanup, I've found a 2 month old thread here https://prefect-community.slack.com/archives/CL09KU1K7/p1598535130019500 that mentions running a mutation on the Hasura API for doing deletions with a time filter, and here the suggested issue: https://github.com/PrefectHQ/server/issues/62 - @Dylan and @Sandeep Aggarwal you have participated in that thread. I have little experience with running GraphQL mutations and no experience with using the Hasura API, so I'm wondering if somebody has a working example of what to do here. Searches in Github and Slack for "clean"/"cleanup"/"database size" all give no results, so I'm hoping we can have a thread here with a recipe on what to do. I have no problem doing the legwork myself, for example, implement a Prefect flow that will make the needed API calls to cleanup old tasks. Any suggestions would be much appreciated.
    Dylan

    Dylan

    1 year ago
    Hey @Dennis Schneidermann, Welcome to the Community!
    I’m not sure about the status of 62, but I will check in with the team
    Dennis Schneidermann

    Dennis Schneidermann

    1 year ago
    Thanks Dylan
    Dylan

    Dylan

    1 year ago
    In the meantime, if you’d like to write a flow, you can do the following:1) Decide on how long you’d like to keep your Flow Runs. Let’s say 30 days. I’d make this a default parameter. 2) Get all of the Flow Runs older than a certain date using the graphql api on your Prefect Server instance and return a list of
    flow_run_id
    s 3) Map over the returned list and use the
    delete_flow_run
    mutation to delete them
    The first time you run this it may take a while, but if you subsequently ran it every day it should be quick depending on your flow run volume
    The Prefect Client has a
    .graphql
    method that accepts a GraphQL query or mutation
    Try getting your queries down in the interactive API first
    Dennis Schneidermann

    Dennis Schneidermann

    1 year ago
    Perfect, seems like all the tooling is already there to make it happen 🙂 Will work on it tomorrow as it's end of the day here 👍
    Dylan

    Dylan

    1 year ago
    Awesome! Let us know how it goes
    And if you’d consider making your flow open source, I’m sure the community would appreciate it!
    s

    Sandeep Aggarwal

    1 year ago
    My flow looks exactly as @Dylan suggested. Just putting it here if it helps @Dennis Schneidermann
    import pendulum
    import prefect
    from dynaconf import settings
    from prefect import schedules
    from prefect.schedules import clocks
    
    # Cron schedule to execute cleanup job. Currently set to run at 00:00 (UTC)
    # every Sunday.
    DATA_CLEANUP_SCHEDULE_CRON_STRING = "0 0 * * 0"
    
    
    @prefect.task
    def get_expired_flow_runs():
        """Returns list of flow run ID that are expired as per retention policy."""
    
        logger = prefect.context.get("logger")
    
        query = {
            "query($updated_before: timestamptz)": {
                """
                flow_run(where: {
                    _and: {
                        state: {_neq: "Scheduled"},
                        updated: {_lt: $updated_before}
                    }
                })
                """: {
                    "id"
                }
            }
        }
    
        retention_period = (
            pendulum.now("UTC")
            .subtract(days=settings.RETENTION_DAYS)
            .start_of("day")
            .to_datetime_string()
        )
    
        client = prefect.Client(api_server=settings.PREFECT_API_ENDPOINT)
        flow_runs = client.graphql(
            query, variables={"updated_before": retention_period}
        ).data.flow_run
    
        <http://logger.info|logger.info>("Expired flow run count: %s", len(flow_runs))
    
        expired_flow_run_ids = [flow_run.id for flow_run in flow_runs]
        logger.debug("Expired flow run ids: %s", expired_flow_run_ids)
        return expired_flow_run_ids
    
    
    @prefect.task
    def delete_flow_run(flow_run_id):
        """
        Hits a mutation using Prefect client to delete flow run associated with
        given ID.
        As per confirmation in this slack thread:
        <https://prefect-community.slack.com/?redir=%2Farchives%2FCL09KU1K7%2Fp1598535130019500>,
        it should be sufficient to delete just the flow run and database cascades
        should take care of all the related objects.
    
        Args:
            flow_run_id: ID of flow run to be deleted.
        """
        logger = prefect.context.get("logger")
    
        mutation = """
            mutation($input: delete_flow_run_input!) {
                delete_flow_run(input: $input) {
                    success
                }
            }
        """
    
        client = prefect.Client(api_server=settings.PREFECT_API_ENDPOINT)
        response = client.graphql(
            mutation, variables=dict(input=dict(flow_run_id=flow_run_id))
        )
    
        logger.debug("Flow id: %s deletion success: %s", flow_run_id, response.data)
    
    with prefect.Flow(
        "cleanup_expired_data",
        schedule=schedules.Schedule(
            clocks=[clocks.CronClock(DATA_CLEANUP_SCHEDULE_CRON_STRING)]
        ),
    ) as cleanup_expired_data:
        # Prefect API currently doesn't support bulk deletion of flow runs. So, for
        # now we do it one by one until below bug is resolved.
        # @see: <https://github.com/PrefectHQ/server/issues/62>
        expired_flow_runs = get_expired_flow_runs()
        delete_flow_run.map(expired_flow_runs)
    Dennis Schneidermann

    Dennis Schneidermann

    1 year ago
    Very appreciated @Sandeep Aggarwal, thanks 🙂
    Alexander

    Alexander

    1 year ago