Hi! Is there a way to grab the worker that polled ...
# ask-community
c
Hi! Is there a way to grab the worker that polled a specific flow run submission? For example, I have a single work-queue that is polled by 3 different process workers. Deployments that run under this work-queue can get picked up by any of the 3 workers. The worker that submitted the flow run gets logged to the beginning of the flow run logs, but I don't see an explicit way to grab the workers from the flow run that doesn't involve manually looking in the logs.
b
Hi Carlos! I haven't found a direct way of getting the ID of a worker that submitted a flow. However, I think you may be able to leverage to REST API or Prefect client to fetch that info for you from the flow run logs.
This example gets the flow run logs using the client, and returns them in a list:
Copy code
from prefect import get_client
from prefect.server.schemas.filters import LogFilter, LogFilterFlowRunId
from prefect.utilities.asyncutils import sync_compatible

@sync_compatible
async def get_flow_run_logs(flow_runs: list[str]):
    async with get_client() as client:
        flow_logs = await client.read_logs(
                log_filter=LogFilter(
                    flow_run_id=LogFilterFlowRunId(any_=flow_runs)
                )
            )
        print(flow_logs)
        return flow_logs
    
if __name__ == "__main__":
    get_flow_run_logs(["770ef8cc-2273-484c-a5a8-b5a094249831"])
This one uses the Rest API to do the same thing, but has an added filter to only get the first log (which usually contains the worker id)
Copy code
import requests
import json


url = "<https://api.prefect.cloud/api/accounts/><your-account-id>/workspaces/<your-workspace-id>/logs/filter"


apikey = "<your-api_key>"
print(url)

headers= {
"Content-Type": "application/json",
"Authorization": f"bearer {apikey}",
}

json =  {
    "limit": 1,
    "offset": 0,
    "logs": {
        "flow_run_id": {
            "any_": ["770ef8cc-2273-484c-a5a8-b5a094249831"]
        }
    }
}

try:
    response = <http://requests.post|requests.post>(url=url, headers=headers, json=json)
except Exception as e:
    print(e)

print (response.json())
c
Thank you Bianca. Appreciate it!! I will give it a try tomorrow. I wish there was a way to grab this info in a more concrete way, would be very useful to us that only use process workers and have multiple workers for a single work queue for resiliency. It’s useful to know which how many flow runs a single worker is currently Running.
b
NP! Never hurts to submit an idea through GitHub, if you'd like
👍 1
c
I went ahead and created a script to get workers that were used to run specific flow runs currently in a running state, can be modified for any filters, thought I'd share it for anybody else that runs into this thread:
Copy code
from prefect import flow, task
from prefect.client.orchestration import get_client
from prefect.server.schemas.filters import DeploymentFilter, FlowRunFilter, FlowRunFilterStateName, FlowRunFilterState, DeploymentFilterName, LogFilter, LogFilterFlowRunId
from prefect.artifacts import create_table_artifact
from prefect.task_runners import ThreadPoolTaskRunner
import asyncio
from collections import Counter

async def list_deployment_running_flow_runs(deployment_name: str):
    async with get_client() as client:
        flow_runs = await client.read_flow_runs(
            flow_run_filter=FlowRunFilter(
                state=FlowRunFilterState(
                    name=FlowRunFilterStateName(any_=["Running"])
                )
            ),
            deployment_filter=DeploymentFilter(
                name=DeploymentFilterName(any_=[deployment_name])
            )

        )
    return flow_runs

@task
async def get_flow_run_worker(flow_run_id: str):
    async with get_client() as client:
        first_log_line = await client.read_logs(
            log_filter=LogFilter(
                flow_run_id = LogFilterFlowRunId(any_= [flow_run_id])), 
            limit=1, 
            offset=0)
        
    return first_log_line[0].message.split()[1].strip("'")

@task
async def create_worker_artifact(workers: list[str]):
    count_dict = dict(Counter(workers))
    await create_table_artifact(key="v500-long-running-workers", table=[count_dict], description="Table containing workers currently running v500 long-running flows")

@flow(task_runner=ThreadPoolTaskRunner(max_workers=15))
async def get_flow_run_workers():

    flow_runs = await list_deployment_running_flow_runs("V500-UPDATE-TABLE")
    worker_names = []

    for flow_run in flow_runs:
        worker_names.append(get_flow_run_worker.submit(flow_run.id))

    await create_worker_artifact(worker_names)

if __name__ == '__main__':
    asyncio.run(get_flow_run_workers())