Dennis Schneidermann
10/22/2020, 2:31 PMDylan
Dennis Schneidermann
10/22/2020, 2:36 PMDylan
flow_run_id
s
3) Map over the returned list and use the delete_flow_run
mutation to delete them.graphql
method that accepts a GraphQL query or mutationDennis Schneidermann
10/22/2020, 2:39 PMDylan
Sandeep Aggarwal
10/22/2020, 3:05 PMimport pendulum
import prefect
from dynaconf import settings
from prefect import schedules
from prefect.schedules import clocks
# Cron schedule to execute cleanup job. Currently set to run at 00:00 (UTC)
# every Sunday.
DATA_CLEANUP_SCHEDULE_CRON_STRING = "0 0 * * 0"
@prefect.task
def get_expired_flow_runs():
"""Returns list of flow run ID that are expired as per retention policy."""
logger = prefect.context.get("logger")
query = {
"query($updated_before: timestamptz)": {
"""
flow_run(where: {
_and: {
state: {_neq: "Scheduled"},
updated: {_lt: $updated_before}
}
})
""": {
"id"
}
}
}
retention_period = (
pendulum.now("UTC")
.subtract(days=settings.RETENTION_DAYS)
.start_of("day")
.to_datetime_string()
)
client = prefect.Client(api_server=settings.PREFECT_API_ENDPOINT)
flow_runs = client.graphql(
query, variables={"updated_before": retention_period}
).data.flow_run
<http://logger.info|logger.info>("Expired flow run count: %s", len(flow_runs))
expired_flow_run_ids = [flow_run.id for flow_run in flow_runs]
logger.debug("Expired flow run ids: %s", expired_flow_run_ids)
return expired_flow_run_ids
@prefect.task
def delete_flow_run(flow_run_id):
"""
Hits a mutation using Prefect client to delete flow run associated with
given ID.
As per confirmation in this slack thread:
<https://prefect-community.slack.com/?redir=%2Farchives%2FCL09KU1K7%2Fp1598535130019500>,
it should be sufficient to delete just the flow run and database cascades
should take care of all the related objects.
Args:
flow_run_id: ID of flow run to be deleted.
"""
logger = prefect.context.get("logger")
mutation = """
mutation($input: delete_flow_run_input!) {
delete_flow_run(input: $input) {
success
}
}
"""
client = prefect.Client(api_server=settings.PREFECT_API_ENDPOINT)
response = client.graphql(
mutation, variables=dict(input=dict(flow_run_id=flow_run_id))
)
logger.debug("Flow id: %s deletion success: %s", flow_run_id, response.data)
with prefect.Flow(
"cleanup_expired_data",
schedule=schedules.Schedule(
clocks=[clocks.CronClock(DATA_CLEANUP_SCHEDULE_CRON_STRING)]
),
) as cleanup_expired_data:
# Prefect API currently doesn't support bulk deletion of flow runs. So, for
# now we do it one by one until below bug is resolved.
# @see: <https://github.com/PrefectHQ/server/issues/62>
expired_flow_runs = get_expired_flow_runs()
delete_flow_run.map(expired_flow_runs)
Dennis Schneidermann
10/22/2020, 3:26 PMAlexander
10/23/2020, 7:35 PM