Slackbot
01/08/2025, 7:38 PMMarvin
01/08/2025, 7:39 PMFlowRunContext.get()
to access the current flow run context.
2. Query for previous flow runs: Use the Prefect client to query for flow runs, sorting them by their start time. You'll want to filter these to get only the relevant runs of your flow.
Here's a conceptual example:
python
from prefect import flow
from prefect.client import get_client
from prefect.context import FlowRunContext
from prefect.server.schemas.filters import FlowRunFilter
from prefect.server.schemas.sorting import FlowRunSort
@flow
def my_flow():
# Get the current flow run context
flow_run_context = FlowRunContext.get()
# Initialize the client
client = get_client()
# Query for previous flow runs
flow_runs = client.read_flow_runs(
flow_run_filter=FlowRunFilter(
flow_id={"any_": [flow_run_context.flow.id]}
),
sort=FlowRunSort.EXPECTED_START_TIME_DESC,
limit=2 # Get the last two runs to find the previous one
)
if len(flow_runs) > 1:
last_run = flow_runs[1]
print("Last run start time:", last_run.start_time)
else:
print("No previous runs found.")
my_flow()
This example assumes you have a running Prefect server and the necessary setup to connect to it. You can find more details in the Prefect documentation.
Keep in mind some common gotchas in Prefect 3.x:
- Agents are replaced by workers, and work pools replace infrastructure blocks.
- Ensure you're using the correct client and context methods as per Prefect 3.x.
For more information, you can explore the Prefect API and Prefect GitHub source.Marvin
01/08/2025, 7:44 PMread_flow_runs
are asynchronous, and you need to await
them. Here's how you can modify your code to handle this:
1. Use async
in your flow definition: Make your flow function asynchronous.
2. await
the asynchronous methods: Use await
when calling methods like read_flow_runs
.
Here's an updated version of your flow function:
python
from prefect import flow
from prefect.client import get_client
from prefect.context import FlowRunContext
from prefect.server.schemas.filters import FlowRunFilter
from prefect.server.schemas.sorting import FlowRunSort
@flow
async def my_flow():
# Get the current flow run context
flow_run_context = FlowRunContext.get()
# Initialize the client
async with get_client() as client:
# Query for previous flow runs
flow_runs = await client.read_flow_runs(
flow_run_filter=FlowRunFilter(
flow_id={"any_": [flow_run_context.flow.id]}
),
sort=FlowRunSort.EXPECTED_START_TIME_DESC,
limit=2 # Get the last two runs to find the previous one
)
if len(flow_runs) > 1:
last_run = flow_runs[1]
print("Last run start time:", last_run.start_time)
else:
print("No previous runs found.")
# Run the flow
my_flow()
This should resolve the TypeError
by properly awaiting the asynchronous call. Remember, in Python, await
is used to pause the execution of the coroutine until the awaited coroutine is complete. For more details, you can refer to the Prefect documentation.Marvin
01/08/2025, 7:45 PMget_client()
to get a client instance.
2. Query flow runs: Use client.read_flow_runs()
with appropriate filters and sorting to get the last run.
Here's a code example:
python
from prefect import flow, get_client
from prefect.client.schemas.filters import FlowRunFilter
from prefect.client.schemas.sorting import FlowRunSort
@flow
def my_flow():
# Initialize the client
client = get_client()
# Query for previous flow runs
flow_runs = client.read_flow_runs(
flow_run_filter=FlowRunFilter(
flow_id={"any_": [client.flow_id]}
),
sort=FlowRunSort.EXPECTED_START_TIME_DESC,
limit=2 # Get the last two runs to find the previous one
)
if len(flow_runs) > 1:
last_run = flow_runs[1]
print("Last run start time:", last_run.start_time)
else:
print("No previous runs found.")
# Run the flow
my_flow()
Note that the read_flow_runs
method needs to be awaited in an asynchronous context. However, this example assumes a synchronous context, which might require modification for actual asynchronous execution in Prefect 2.x. You can refer to the Prefect 2.x documentation for more details.