Carlos Cueto
11/06/2024, 8:35 PMBianca Hoch
11/07/2024, 1:01 AMBianca Hoch
11/07/2024, 1:02 AMfrom prefect import get_client
from prefect.server.schemas.filters import LogFilter, LogFilterFlowRunId
from prefect.utilities.asyncutils import sync_compatible
@sync_compatible
async def get_flow_run_logs(flow_runs: list[str]):
async with get_client() as client:
flow_logs = await client.read_logs(
log_filter=LogFilter(
flow_run_id=LogFilterFlowRunId(any_=flow_runs)
)
)
print(flow_logs)
return flow_logs
if __name__ == "__main__":
get_flow_run_logs(["770ef8cc-2273-484c-a5a8-b5a094249831"])
Bianca Hoch
11/07/2024, 1:05 AMimport requests
import json
url = "<https://api.prefect.cloud/api/accounts/><your-account-id>/workspaces/<your-workspace-id>/logs/filter"
apikey = "<your-api_key>"
print(url)
headers= {
"Content-Type": "application/json",
"Authorization": f"bearer {apikey}",
}
json = {
"limit": 1,
"offset": 0,
"logs": {
"flow_run_id": {
"any_": ["770ef8cc-2273-484c-a5a8-b5a094249831"]
}
}
}
try:
response = <http://requests.post|requests.post>(url=url, headers=headers, json=json)
except Exception as e:
print(e)
print (response.json())
Carlos Cueto
11/07/2024, 1:16 AMBianca Hoch
11/07/2024, 1:21 AMCarlos Cueto
11/07/2024, 7:24 PMfrom prefect import flow, task
from prefect.client.orchestration import get_client
from prefect.server.schemas.filters import DeploymentFilter, FlowRunFilter, FlowRunFilterStateName, FlowRunFilterState, DeploymentFilterName, LogFilter, LogFilterFlowRunId
from prefect.artifacts import create_table_artifact
from prefect.task_runners import ThreadPoolTaskRunner
import asyncio
from collections import Counter
async def list_deployment_running_flow_runs(deployment_name: str):
async with get_client() as client:
flow_runs = await client.read_flow_runs(
flow_run_filter=FlowRunFilter(
state=FlowRunFilterState(
name=FlowRunFilterStateName(any_=["Running"])
)
),
deployment_filter=DeploymentFilter(
name=DeploymentFilterName(any_=[deployment_name])
)
)
return flow_runs
@task
async def get_flow_run_worker(flow_run_id: str):
async with get_client() as client:
first_log_line = await client.read_logs(
log_filter=LogFilter(
flow_run_id = LogFilterFlowRunId(any_= [flow_run_id])),
limit=1,
offset=0)
return first_log_line[0].message.split()[1].strip("'")
@task
async def create_worker_artifact(workers: list[str]):
count_dict = dict(Counter(workers))
await create_table_artifact(key="v500-long-running-workers", table=[count_dict], description="Table containing workers currently running v500 long-running flows")
@flow(task_runner=ThreadPoolTaskRunner(max_workers=15))
async def get_flow_run_workers():
flow_runs = await list_deployment_running_flow_runs("V500-UPDATE-TABLE")
worker_names = []
for flow_run in flow_runs:
worker_names.append(get_flow_run_worker.submit(flow_run.id))
await create_worker_artifact(worker_names)
if __name__ == '__main__':
asyncio.run(get_flow_run_workers())