Hi One of Prefect flows is being marked as Complet...
# ask-community
s
Hi One of Prefect flows is being marked as Completed in the UI, while several tasks within the flow are still shown as Running. Could you help me understand what might cause this discrepancy and how to resolve it?
b
Hi @Srikar Kolukuluri, can you share any snippets of the code you are using?
Are you using the
.submit()
method to call the tasks from your flow?
s
yeah, I'm using .submit() and wait_for in my function something like this...this function will run in async and run for around 25 tasks parallelly
Copy code
async def process(
   **arg
) -> None:

    in_prog = log_audit_status.submit(
        **arg
    )
    in_prog.result()

    # Delete existing  data
    del_fut = delete_existing_backfill_data.submit(
        **arg
        wait_for=[in_prog],
    )
    del_fut.result()
    )

    # run the command
    cmd_state: State = shell_run_command.submit(
        **arg
        return_state=True,
        wait_for=[del_fut],
    )
    cmd_state.result()
b
Is the
process
method your flow?
Or are you calling
process
in your flow?
s
I'm calling process method in my flow
👍 1
b
Are you awaiting the
process
method results?
s
yeah, I was awaiting till all the streams got completed futures = [] for company in companies: for stream, primary_keys in streams_dict.items(): fut = process.submit( **args ) futures.append(fut) for fut in futures: fut.result()
b
Since your
process
method is defined as
async
, you'll need to use
asyncio.gather
instead of
.result()
https://docs.prefect.io/v3/how-to-guides/workflows/run-work-concurrently#using-asyncio
I think you could remove
async
from your
process
method and it should work without modification.
Out of curiosity, are your tasks that you're calling inside the
process
method also async?
s
yes, all are async functions
b
Out of curiosity, are most of the "running" tasks you're seeing from the
shell_run_command
call?
I think you're running into this issue: https://github.com/PrefectHQ/prefect/issues/9317
When you call this code inside
process
, it returns the final state result, not a future. Calling
.result()
on it causes it to not be awaited correctly.
Copy code
# run the command
    cmd_state: State = shell_run_command.submit(
        **arg,
        return_state = True,
        wait_for = [del_fut]
    )
    cmd_state.result()
This is the output I get:
Copy code
11:56:32.738 | INFO    | Task run 'shell_run_command-a0d' - Finished in state Completed()
11:56:32.741 | INFO    | Task run 'shell_run_command-245' - Finished in state Completed()
11:56:32.743 | INFO    | Task run 'shell_run_command-772' - Finished in state Completed()
11:56:32.749 | INFO    | Task run 'process-182' - Finished in state Completed()
11:56:32.750 | INFO    | Task run 'process-475' - Finished in state Completed()
/Users/brendan/Documents/PycharmProjects/prefect/demos/examples/async.py:42: RuntimeWarning: coroutine 'State.aresult' was never awaited
  cmd_state.result()
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
11:56:32.755 | INFO    | Task run 'process-d98' - Finished in state Completed()
11:56:32.900 | INFO    | Flow run 'papaya-trout' - Finished in state Completed()
If you remove
cmd_state.result()
inside
process
it should resolve the error
s
I'm seeing this issue across all functions, starting from
log_audit_status
. Some of them are getting stuck right at the first function. I’ve converted all my async functions to synchronous ones, but unfortunately, that hasn’t resolved the issue. Is there anything else I should be looking into or taking care of?