Ponraj
06/21/2024, 1:31 PMMarvin
06/21/2024, 1:31 PMPrefectFuture
by calling the .wait()
method on it. Here's an example using a simple flow:
python
from prefect import flow, task, PrefectFuture
@task
def add_one(x: int) -> int:
return x + 1
@flow
def my_flow() -> str:
future: PrefectFuture = add_one.submit(1)
state = future.wait() # get the state of the task run
return state.state
if __name__ == "__main__":
flow_state = my_flow()
print(flow_state)
Link to States Concept in Prefect Docs
2. Using the Prefect Client:
You can use the Prefect Client to read the states of flow runs. Example Python snippet to filter flow runs based on states:
python
from prefect.client import get_client
from prefect.filters import FlowRunFilter
import asyncio
async def get_flow_runs(states: list[str]):
async with get_client() as client:
flow_runs = await client.read_flow_runs(
flow_run_filter=FlowRunFilter(
state=dict(name=dict(any_=states))
)
)
return flow_runs
if __name__ == "__main__":
flow_runs = asyncio.run(get_flow_runs([“Completed”, "Running"]))
for run in flow_runs:
print(f"Flow Run ID: {run.id}, State: {run.state.name}")
Link to Using the Prefect Client Guide
3. GitHub Issues:
There are several GitHub issues discussing flow run states and related filtering capabilities which might be beneficial:
- Filter Flows by most recent flow run state
- Support subflow cancellation if parent flow is in terminal state
These resources should help you programmatically determine the state of your flow runs or filter them based on specific states like "Completed", "Scheduled", "Running", etc.Ponraj
06/21/2024, 1:59 PMMarvin
06/21/2024, 2:00 PM--watch
command in the Prefect CLI or programmatically poll the flow state using the Prefect client.
### Using CLI
Prefect has recently introduced the --watch
command. This command provides real-time monitoring of flow runs, displaying updates until the run is completed. Here’s an example:
bash
prefect deployment run --watch <slugified-flow-name>/<slugified-deployment-name>
This will monitor the specified flow run and display updates in real-time until the flow completes.
Link to GitHub PR for `--watch` option
### Using Prefect Client with Polling
If you prefer to programmatically monitor the state, you can continuously poll the flow state using the Prefect client. Here’s a Python snippet that demonstrates this:
python
from prefect.client.orchestration import get_client
from prefect.states import State
import asyncio
async def get_flow_run_state(flow_run_id: str):
async with get_client() as client:
while True:
flow_run = await client.read_flow_run(flow_run_id)
state = flow_run.state
print(f"Current state: {state.name}")
if state.is_final():
break
await asyncio.sleep(1) # Adjust the sleep duration as needed
if __name__ == "__main__":
flow_run_id = "<your-flow-run-id>"
asyncio.run(get_flow_run_state(flow_run_id))
Using either method, you can effectively monitor the real-time state of your flow runs.
Link to Prefect States concept documentation
Refer to these resources for more information and additional context:
- Option to stream flow run logs for streaming logs alongside state monitoring.
- Support subflow cancellation if parent flow is in terminal state for complex use cases involving subflows.
By leveraging the --watch
command in the CLI or using the Prefect client to poll states, you can keep track of your flows' statuses in real-time.Ponraj
06/21/2024, 3:07 PM