datamongus
08/25/2024, 10:00 PMMarvin
08/25/2024, 10:01 PMbash
pip install prefect
2. Prefect Flow for Deleting Completed Task Runs:
python
from prefect import flow, task, get_client
from datetime import datetime, timedelta
@task
async def delete_old_task_runs():
async with get_client() as client:
# Calculate the cutoff datetime
cutoff = datetime.utcnow() - timedelta(days=3)
# Fetch all task runs in completed state older than cutoff
task_runs = await client.read_task_runs(where={
"state": {"type": "COMPLETED"},
"end_time": {"lt": cutoff.isoformat()}
})
# Delete the task runs
for task_run in task_runs:
await client.delete_task_run(task_run.id)
@flow(log_prints=True)
async def cleanup_flow():
await delete_old_task_runs()
if __name__ == "__main__":
cleanup_flow()
3. Run the Flow:
You can run the flow manually:
bash
python your_script_name.py
Make sure to replace "your_script_name.py"
with the actual script file name.
Feel free to review the details and make necessary customizations according to your Prefect version and setup. For any further details, you might want to refer to the Prefect documentation.
If you need more specific examples, you can explore Prefect code examples.Nate
08/25/2024, 11:40 PMwhere
you'd use FlowRunFilter(state=dict(type=dict(any_=["COMPLETED"])), start_time=...)
datamongus
08/25/2024, 11:41 PMNate
08/25/2024, 11:43 PMfrom prefect.client.schemas.filters import FlowRunFilter
fwiw: to answer "how to import X?" in general i'd suggest
⢠do a github search like this
⢠use client
schemas, not server
schemas (unless you're contributing a server component š )datamongus
08/25/2024, 11:44 PM