Dijar Vrella
08/17/2024, 7:20 PMMarvin
08/17/2024, 7:20 PMinspect
command can generate JSON output using a flag:
bash
prefect work-pool inspect <name> --output=json
or
bash
prefect work-pool inspect <name> -o json
This feature aims to improve automation and scripting capabilities by allowing CLI command outputs to be easily parsed and utilized as inputs for other processes.
Refer to the relevant GitHub Issue for more details.
For more information on CLI commands and their options, you can also check the official Prefect documentation.Dijar Vrella
08/17/2024, 7:21 PMMarvin
08/17/2024, 7:21 PMprefect flow-run ls
command does not support a JSON output option directly. This feature is available for some commands, but flow-run ls
is not one of them at the moment.
For more details on related feature requests and further updates, you can refer to the GitHub Issue.
Further information can also be found in the Prefect documentation.Dijar Vrella
08/17/2024, 7:23 PMMarvin
08/17/2024, 7:23 PMsystemd
service, you'll need to use Prefect's Python client to fetch this data programmatically. This avoids the issues you're experiencing with CLI commands in the systemd
environment.
Here’s a basic example of how you can achieve this:
1. Install Prefect if you haven't already:
sh
pip install prefect
2. Create a Python script to fetch the required flow runs:
python
import asyncio
from datetime import datetime, timedelta, timezone
from prefect import get_client
from prefect.client.schemas.filters import FlowRunFilter, FlowRunFilterState, FlowRunFilterStateName
from prefect.client.schemas.sorting import FlowRunSort
async def get_recent_completed_runs():
async with get_client() as client:
now = datetime.now(timezone.utc)
since_time = now - timedelta(minutes=30)
flow_runs = await client.read_flow_runs(
flow_run_filter=FlowRunFilter(
state=FlowRunFilterState(start_time={"after_": since_time})
),
sort=FlowRunSort.END_TIME_DESC
)
return [run for run in flow_runs if run.state.name == "Completed"]
async def get_pending_runs():
async with get_client() as client:
flow_runs = await client.read_flow_runs(
flow_run_filter=FlowRunFilter(
state=FlowRunFilterState(
name=FlowRunFilterStateName(any_=["SCHEDULED", "LATE", "AWAITING RETRY"])
)
)
)
return flow_runs
if __name__ == "__main__":
completed_runs = asyncio.run(get_recent_completed_runs())
pending_runs = asyncio.run(get_pending_runs())
print("Completed Runs in Last 30 Minutes:", [run.id for run in completed_runs])
print("Pending Runs:", [run.id for run in pending_runs])
3. Set up a Systemd service to run your script:
Create a systemd service file, e.g., `/etc/systemd/system/prefect-flow-status.service`:
ini
[Unit]
Description=Prefect Flow Status Checker
After=network.target
[Service]
Type=simple
ExecStart=/usr/bin/python3 /path/to/your_script.py
Restart=on-failure
[Install]
WantedBy=multi-user.target
Enable and start the service:
sh
sudo systemctl enable prefect-flow-status.service
sudo systemctl start prefect-flow-status.service
This script will fetch the flow runs that completed in the last 30 minutes and those that are in pending states dynamically, and you can schedule or run this script as needed with systemd
.
Reference: Prefect Client Documentation