https://prefect.io logo
g

Gregory Hunt

09/04/2023, 3:00 PM
Is there a way to filter
client.read_flow_runs
by last updated date?
I am running an autoscaling process worker and sometimes the worker containers crash which leaves flow runs handing it a pending or running state. I would like to create a job that runs and reschedules jobs stuck in those states that have not be updated with in a certain time frame
@Nate you are the
read_flow_runs
filter expert, any ideas
Copy code
runs = await client.read_flow_runs(
                sort=FlowRunSort.EXPECTED_START_TIME_ASC,
                flow_run_filter=FlowRunFilter(
                    start_time=FlowRunFilterStartTime(before_=start_time),
                    state=FlowRunFilterState(
                        type=FlowRunFilterStateType(
                            any_=[
                                "PENDING",
                                "RUNNING",
                            ]
                        )
                    ),
                ),
            )
I have this so far and unsure what value maps to last updated?
n

Nate

09/05/2023, 5:00 PM
what value maps to last updated?
do you mean you'd like to filter by when a flow run last entered a new state?
g

Gregory Hunt

09/05/2023, 5:16 PM
@Nate yeah I want to be able to find and reschedule orphaned flows in the pending or running status when a process worker crashes or restarts
@Nate so far is seems to be tricky, since these flows are created via an API call it only has the expected_start_date, and since we do have bursty loads. I am trying to figure out how to write a job that will requeue ones that go stuck in "PENDING" or "RUNNING" state
🤔 1