Priyank
07/18/2022, 1:03 PMSylvain Hazard
07/18/2022, 1:10 PM@task(name="Delete old flow runs")
def delete_old_runs(threshold_in_days: int):
if threshold_in_days <= 0:
raise ValueError(
"You should set a value for `threshold_in_days` strictly above zero."
)
target_datetime = datetime.now() - timedelta(days=threshold_in_days)
client = prefect.Client()
flow_runs = client.graphql(
{
"query": {
f'flow_run (where: {{start_time: {{_lt: "{target_datetime}"}}}})': [
"id",
"start_time",
{"flow": {"name"}},
]
}
}
)
prefect.context.get("logger").info(
f"Deleting {len(flow_runs['data']['flow_run'])} flow runs."
)
for flow_run in flow_runs["data"]["flow_run"]:
prefect.context.get("logger").info(
client.graphql(
{
"mutation": {
f"delete_flow_run(input: {{flow_run_id: \"{flow_run.get('id')}\"}})": {
"success",
"error",
}
}
}
)
)
Priyank
07/19/2022, 1:21 PMSylvain Hazard
07/19/2022, 1:34 PMclient = prefect.Client()
flow_runs = client.graphql(
YOUR_QUERY
)
for flow_run in flow_runs["data"]["flow_run"]:
start_time = flow_run.get("start_time")
end_time = flow_run.get("end_time")
duration = end_time - start_time
if duration > DURATION_THRESHOLD:
# do stuff
You'd probably have to account for flows that are still running because they won't have an end_time but it should be doable.Kevin Kho
07/19/2022, 2:30 PM