Hoping someone can help me out here. What's the co...
# ask-community
j
Hoping someone can help me out here. What's the correct way to use async tasks in an async flow? I have a simple flow that gets some flow runs from the prefect client, checks their state to see if they should be canceled, and then cancels them. Except it doesn't work. The logs tell me that the first task
get-flow-runs
finishes in state Completed but then the task that uses the output from that throws an error that
get-flow-runs
is in a Pending state. How can both be true?
Correction: Looking at the UI it looks like
get-flow-runs
is Completed but the second task is in a NotReady state. How does that happen? It doesn't seem to matter if I call the task directly or if I use submit. The only thing that resolves it is removing the task decorator/calling the .fn method. Is there something simple I'm missing here?
Copy code
@flow
async def cancel_stuck_flows():
    logger = get_logger()
    flow_runs = await get_flow_runs()
    to_cancel = await filter_flow_runs(flow_runs)

    if not to_cancel:
        return

    # logger.info(f"Cancelling {len(to_cancel)} flow runs")
    futures = await cancel_flow_run.map(to_cancel)
    for future in futures:
        print(type(future), future)
        future.wait()
Logs:
Copy code
07:42:29.839 | INFO    | Flow run 'strange-moose' - Created task run 'get_flow_runs-0' for task 'get_flow_runs'
07:42:29.842 | INFO    | Flow run 'strange-moose' - Executing 'get_flow_runs-0' immediately...
07:42:32.347 | INFO    | Task run 'get_flow_runs-0' - Finished in state Completed()
07:42:32.349 | WARNING | Task run 'get_flow_runs-0' - Task run 'get_flow_runs-0' finished in state Completed()
07:42:32.609 | INFO    | Flow run 'strange-moose' - Created task run 'filter_flow_runs-0' for task 'filter_flow_runs'
07:42:32.610 | INFO    | Flow run 'strange-moose' - Executing 'filter_flow_runs-0' immediately...
07:42:32.820 | ERROR   | Flow run 'strange-moose' - Encountered exception during execution:
Traceback (most recent call last):
  File "/home/jessica/source/orion/Prefect-2/.venv/lib/python3.10/site-packages/prefect/engine.py", line 864, in orchestrate_flow_run
    result = await flow_call.aresult()
  File "/home/jessica/source/orion/Prefect-2/.venv/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 293, in aresult
    return await asyncio.wrap_future(self.future)
  File "/home/jessica/source/orion/Prefect-2/.venv/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 355, in _run_async
    result = await coro
  File "/home/jessica/source/orion/Prefect-2/flows/maint/cancel_stuck_flows.py", line 76, in cancel_stuck_flows
    to_cancel = await filter_flow_runs(flow_runs)
  File "/home/jessica/source/orion/Prefect-2/.venv/lib/python3.10/site-packages/prefect/_internal/concurrency/api.py", line 182, in wait_for_call_in_loop_thread
    return call.result()
  File "/home/jessica/source/orion/Prefect-2/.venv/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 284, in result
    return self.future.result(timeout=timeout)
  File "/home/jessica/source/orion/Prefect-2/.venv/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 168, in result
    return self.__get_result()
  File "/usr/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
    raise self._exception
  File "/home/jessica/source/orion/Prefect-2/.venv/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 355, in _run_async
    result = await coro
  File "/home/jessica/source/orion/Prefect-2/.venv/lib/python3.10/site-packages/prefect/engine.py", line 1571, in get_task_call_return_value
    return await future._result()
  File "/home/jessica/source/orion/Prefect-2/.venv/lib/python3.10/site-packages/prefect/futures.py", line 237, in _result
    return await final_state.result(raise_on_failure=raise_on_failure, fetch=True)
  File "/home/jessica/source/orion/Prefect-2/.venv/lib/python3.10/site-packages/prefect/states.py", line 84, in _get_state_result
    raise UnfinishedRun(
prefect.exceptions.UnfinishedRun: Run is in PENDING state, its result is not available.
n
hi @Jessica Smith - do you happen to have an MRE that gives this exception? its hard to know what exactly is going on inside your helpers
in general, the answer to this > What's the correct way to use async tasks in an async flow? is just like
async
python functions in another
async
python function, plus
submit
and
map
j
@Nate Sorry, wasn't able to get back to this yesterday. Here's an MRE - it doesn't do much, just gets flows from the client and then passes them to a function that adds them to a new list and returns them (the real logic would have filtering here). But even with only these functions it still fails
Copy code
import asyncio

from prefect import flow, get_client, task
from prefect.client.schemas.filters import FlowRunFilter, FlowRunFilterState, FlowRunFilterStateType
from prefect.client.schemas.objects import FlowRun, StateType


@task
async def get_flow_runs() -> list[FlowRun]:
    async with get_client() as client:
        flow_runs = await client.read_flow_runs(
            flow_run_filter=FlowRunFilter(
                state=FlowRunFilterState(
                    type=FlowRunFilterStateType(any_=[StateType.RUNNING, StateType.PENDING]),
                )
            )
        )

        return flow_runs


@task(task_run_name="cancel_{flow_run.name}")
async def read_flow_run_states(flow_run: FlowRun):
    async with get_client() as client:
        res = await client.read_flow_run_states(flow_run_id=flow_run.id)

        return res


@task
async def filter_flow_runs(flow_runs: list[FlowRun]):
    to_cancel: list[FlowRun] = []
    for fr in flow_runs:
        to_cancel.append(fr)

    return to_cancel


@flow
async def cancel_stuck_flows():
    flow_runs = await get_flow_runs.submit()
    to_cancel = await filter_flow_runs(flow_runs)

    if not to_cancel:
        return

    futures = await read_flow_run_states.map(to_cancel)
    for future in futures:
        print(type(future), future)
        future.wait()


if __name__ == "__main__":
    asyncio.run(cancel_stuck_flows())
n
thanks so much! this is helpful - will take a look at this today