Stéphan Taljaard
11/30/2023, 10:25 AMMarvin
11/30/2023, 10:26 AMimport asyncio
from prefect import Client
async def get_most_recent_flow_runs():
client = Client()
last_3_flow_runs = await client.get_flow_runs(order_by={"end_time": "desc"}, limit=3)
return last_3_flow_runs
last_3_flow_runs = asyncio.run(get_most_recent_flow_runs())
print(last_3_flow_runs)
assert all(run.state.is_completed() for run in last_3_flow_runs)
assert (end_times := [run.end_time for run in last_3_flow_runs]) == sorted(end_times, reverse=True)
You can find more details in the Prefect Client guide.
2. Prefect Cloud retains flow runs according to the Flow Run Retention Policy setting in your personal account or organization profile. This policy represents the number of days each flow run is available in the Prefect Cloud UI, and via the Prefect CLI and API. You can use these interfaces to access the metadata of the flow runs. More details can be found in the Prefect Cloud rate limits guide.
3. Deployments in Prefect Cloud can take a trigger specification, which configures an automation to run the deployment based on the presence or absence of events. The event data can be passed into the deployment run as parameters via Jinja templating. This could be a way to pass the original flow run's info/metadata to the triggered deployment flow run. More details can be found in the Prefect Cloud deployments guide.
Please note that these methods require you to be using Prefect Cloud, as some features may not be available in Prefect Core.