KG
08/06/2025, 5:39 PMMarvin
08/06/2025, 5:39 PMMarvin
08/06/2025, 5:43 PMpython
from prefect import flow, get_client
from prefect.client.schemas.filters import FlowRunFilter
from prefect.client.schemas.sorting import FlowRunSort, SortOrder
@flow
async def my_flow():
async with get_client() as client:
# Get the current flow's ID
current_flow_id = flow.get_flow_id()
# Query for the most recent successful run
flow_runs = await client.read_flow_runs(
flow_filter=FlowRunFilter(
flow_id={"any_": [current_flow_id]},
state={"type": {"any_": ["COMPLETED"]}},
),
sort=FlowRunSort.EXPECTED_START_TIME_DESC, # Sort by start time descending
limit=1 # Get only the most recent
)
if flow_runs:
previous_run = flow_runs[0]
previous_scheduled_start = previous_run.expected_start_time
# Now you can use previous_scheduled_start in your flow logic
Key points about this approach:
1. It uses the official PrefectClient
through get_client()
2. The code filters for:
- The current flow's ID (to ensure we're getting runs from the same flow)
- Successful runs (state type "COMPLETED")
3. Results are sorted by expected start time in descending order
4. The limit=1
ensures we get only the most recent run
Important notes:
- This code needs to be in an async flow (notice the async def
)
- The client operations are performed within an async context manager
- The expected_start_time
is what Prefect uses for scheduled start times
- If no previous runs exist, the flow_runs
list will be empty, so make sure to handle that case
You can modify the filter criteria based on your needs. For example, if you want to look back further than just the most recent run, you can adjust the limit
parameter or add additional filtering criteria.
This is the most robust and future-proof way to access previous run information, as it uses the official client API and proper filtering mechanisms.
Let me know if you need any clarification or have questions about specific aspects of this approach!KG
08/06/2025, 5:47 PM