Jessica Smith
02/08/2024, 1:45 PMget-flow-runs
finishes in state Completed but then the task that uses the output from that throws an error that get-flow-runs
is in a Pending state. How can both be true?get-flow-runs
is Completed but the second task is in a NotReady state. How does that happen?
It doesn't seem to matter if I call the task directly or if I use submit. The only thing that resolves it is removing the task decorator/calling the .fn method. Is there something simple I'm missing here?
@flow
async def cancel_stuck_flows():
logger = get_logger()
flow_runs = await get_flow_runs()
to_cancel = await filter_flow_runs(flow_runs)
if not to_cancel:
return
# logger.info(f"Cancelling {len(to_cancel)} flow runs")
futures = await cancel_flow_run.map(to_cancel)
for future in futures:
print(type(future), future)
future.wait()
Logs:
07:42:29.839 | INFO | Flow run 'strange-moose' - Created task run 'get_flow_runs-0' for task 'get_flow_runs'
07:42:29.842 | INFO | Flow run 'strange-moose' - Executing 'get_flow_runs-0' immediately...
07:42:32.347 | INFO | Task run 'get_flow_runs-0' - Finished in state Completed()
07:42:32.349 | WARNING | Task run 'get_flow_runs-0' - Task run 'get_flow_runs-0' finished in state Completed()
07:42:32.609 | INFO | Flow run 'strange-moose' - Created task run 'filter_flow_runs-0' for task 'filter_flow_runs'
07:42:32.610 | INFO | Flow run 'strange-moose' - Executing 'filter_flow_runs-0' immediately...
07:42:32.820 | ERROR | Flow run 'strange-moose' - Encountered exception during execution:
Traceback (most recent call last):
File "/home/jessica/source/orion/Prefect-2/.venv/lib/python3.10/site-packages/prefect/engine.py", line 864, in orchestrate_flow_run
result = await flow_call.aresult()
File "/home/jessica/source/orion/Prefect-2/.venv/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 293, in aresult
return await asyncio.wrap_future(self.future)
File "/home/jessica/source/orion/Prefect-2/.venv/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 355, in _run_async
result = await coro
File "/home/jessica/source/orion/Prefect-2/flows/maint/cancel_stuck_flows.py", line 76, in cancel_stuck_flows
to_cancel = await filter_flow_runs(flow_runs)
File "/home/jessica/source/orion/Prefect-2/.venv/lib/python3.10/site-packages/prefect/_internal/concurrency/api.py", line 182, in wait_for_call_in_loop_thread
return call.result()
File "/home/jessica/source/orion/Prefect-2/.venv/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 284, in result
return self.future.result(timeout=timeout)
File "/home/jessica/source/orion/Prefect-2/.venv/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 168, in result
return self.__get_result()
File "/usr/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
raise self._exception
File "/home/jessica/source/orion/Prefect-2/.venv/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 355, in _run_async
result = await coro
File "/home/jessica/source/orion/Prefect-2/.venv/lib/python3.10/site-packages/prefect/engine.py", line 1571, in get_task_call_return_value
return await future._result()
File "/home/jessica/source/orion/Prefect-2/.venv/lib/python3.10/site-packages/prefect/futures.py", line 237, in _result
return await final_state.result(raise_on_failure=raise_on_failure, fetch=True)
File "/home/jessica/source/orion/Prefect-2/.venv/lib/python3.10/site-packages/prefect/states.py", line 84, in _get_state_result
raise UnfinishedRun(
prefect.exceptions.UnfinishedRun: Run is in PENDING state, its result is not available.
Nate
02/08/2024, 3:02 PMNate
02/08/2024, 3:03 PMasync
python functions in another async
python function, plus submit
and map
Jessica Smith
02/09/2024, 1:08 PMimport asyncio
from prefect import flow, get_client, task
from prefect.client.schemas.filters import FlowRunFilter, FlowRunFilterState, FlowRunFilterStateType
from prefect.client.schemas.objects import FlowRun, StateType
@task
async def get_flow_runs() -> list[FlowRun]:
async with get_client() as client:
flow_runs = await client.read_flow_runs(
flow_run_filter=FlowRunFilter(
state=FlowRunFilterState(
type=FlowRunFilterStateType(any_=[StateType.RUNNING, StateType.PENDING]),
)
)
)
return flow_runs
@task(task_run_name="cancel_{flow_run.name}")
async def read_flow_run_states(flow_run: FlowRun):
async with get_client() as client:
res = await client.read_flow_run_states(flow_run_id=flow_run.id)
return res
@task
async def filter_flow_runs(flow_runs: list[FlowRun]):
to_cancel: list[FlowRun] = []
for fr in flow_runs:
to_cancel.append(fr)
return to_cancel
@flow
async def cancel_stuck_flows():
flow_runs = await get_flow_runs.submit()
to_cancel = await filter_flow_runs(flow_runs)
if not to_cancel:
return
futures = await read_flow_run_states.map(to_cancel)
for future in futures:
print(type(future), future)
future.wait()
if __name__ == "__main__":
asyncio.run(cancel_stuck_flows())
Nate
02/09/2024, 2:23 PM