<@ULVA73B9P> how can I poll state of a flow run a...
# ask-marvin
p
@Marvin how can I poll state of a flow run and its children using prefect python SDK?
m
To poll the state of a flow run and its children tasks, you can use the Prefect Python SDK. Here's how you can do it: 1. Poll the state of a flow run: To poll the state of a flow run, you can use the Prefect API to check the current state of the flow run. Below is an example where you would replace
flow_run_id
,
api_key
, and
api_url
with your specific Prefect Cloud or server configurations:
Copy code
python
   import time
   import requests
   from prefect import get_run_logger, task
   from prefect.deployments import run_deployment

   @task
   def wait_for_deployment(flow_run_id: str, log_polling_interval: int = 25, max_run_time: int = 1000):
       logger = get_run_logger()
       api_key = "YOUR_API_KEY"
       api_url = "YOUR_API_URL"

       while True:
           response = requests.get(f"{api_url}/flow_runs/{flow_run_id}", headers={"Authorization": f"Bearer {api_key}"})
           flow_run = response.json()
           <http://logger.info|logger.info>(f"{flow_run['name']} : {flow_run['state_type']}")

           state = flow_run["state_type"]
           if state in ["COMPLETED", "CANCELLED"]:
               break
           time.sleep(log_polling_interval)
2. Retrieve the state of a flow run's tasks: Within a flow, you can check the state of tasks while they are in progress using the
get_state()
method on a task's
Future
. Here's an example where an individual task's state is polled:
Copy code
python
   from prefect import flow, task
   from prefect.task_runners import ConcurrentTaskRunner
   import time

   @task
   def example_task():
       time.sleep(3)
       return {"result": "Example result"}

   @flow(task_runner=ConcurrentTaskRunner)
   def example_flow():
       a = example_task.submit()
       time.sleep(1)
       # get state of task at invocation
       print(a.get_state().type)
       # get value of result upon completion of task
       print(a.result())  # Would block until task completion
The
get_state()
method allows monitoring the intermediate states (like "RUNNING", "PENDING", etc.), and you could call this method periodically to poll the state within your flow. For the final state of the task, you can use
a.result()
which will wait for the task to complete and then return the result. These examples combined give you a way to monitor both the flow run and its tasks. Please remember to set the appropriate API key and URL if you are using Prefect Cloud or Prefect Server, and ensure that you have the necessary permissions to access the flow run data.
n
hi @Pooyan - can you explain why you feel you need to poll the state from the API directly yourself?
p
Lets say I need to let the user know which flow is being run at the moment and what stage it is? In case end user does not have access to prefect UI
n
i see - thanks for the context!
🙌🏻 1