Srikar Kolukuluri
07/02/2025, 1:05 PMBrendan Dalpe
07/02/2025, 1:23 PMBrendan Dalpe
07/02/2025, 1:23 PM.submit()
method to call the tasks from your flow?Srikar Kolukuluri
07/02/2025, 1:45 PMasync def process(
**arg
) -> None:
in_prog = log_audit_status.submit(
**arg
)
in_prog.result()
# Delete existing data
del_fut = delete_existing_backfill_data.submit(
**arg
wait_for=[in_prog],
)
del_fut.result()
)
# run the command
cmd_state: State = shell_run_command.submit(
**arg
return_state=True,
wait_for=[del_fut],
)
cmd_state.result()
Brendan Dalpe
07/02/2025, 2:14 PMprocess
method your flow?Brendan Dalpe
07/02/2025, 2:14 PMprocess
in your flow?Srikar Kolukuluri
07/02/2025, 2:36 PMBrendan Dalpe
07/02/2025, 2:38 PMprocess
method results?Srikar Kolukuluri
07/02/2025, 3:05 PMBrendan Dalpe
07/02/2025, 3:17 PMprocess
method is defined as async
, you'll need to use asyncio.gather
instead of .result()
https://docs.prefect.io/v3/how-to-guides/workflows/run-work-concurrently#using-asyncioBrendan Dalpe
07/02/2025, 3:19 PMasync
from your process
method and it should work without modification.Brendan Dalpe
07/02/2025, 3:19 PMprocess
method also async?Srikar Kolukuluri
07/02/2025, 4:03 PMBrendan Dalpe
07/02/2025, 4:55 PMshell_run_command
call?Brendan Dalpe
07/02/2025, 4:56 PMBrendan Dalpe
07/02/2025, 4:58 PMprocess
, it returns the final state result, not a future. Calling .result()
on it causes it to not be awaited correctly.
# run the command
cmd_state: State = shell_run_command.submit(
**arg,
return_state = True,
wait_for = [del_fut]
)
cmd_state.result()
Brendan Dalpe
07/02/2025, 4:59 PM11:56:32.738 | INFO | Task run 'shell_run_command-a0d' - Finished in state Completed()
11:56:32.741 | INFO | Task run 'shell_run_command-245' - Finished in state Completed()
11:56:32.743 | INFO | Task run 'shell_run_command-772' - Finished in state Completed()
11:56:32.749 | INFO | Task run 'process-182' - Finished in state Completed()
11:56:32.750 | INFO | Task run 'process-475' - Finished in state Completed()
/Users/brendan/Documents/PycharmProjects/prefect/demos/examples/async.py:42: RuntimeWarning: coroutine 'State.aresult' was never awaited
cmd_state.result()
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
11:56:32.755 | INFO | Task run 'process-d98' - Finished in state Completed()
11:56:32.900 | INFO | Flow run 'papaya-trout' - Finished in state Completed()
Brendan Dalpe
07/02/2025, 4:59 PMcmd_state.result()
inside process
it should resolve the errorSrikar Kolukuluri
07/08/2025, 2:21 PMlog_audit_status
. Some of them are getting stuck right at the first function.
I’ve converted all my async functions to synchronous ones, but unfortunately, that hasn’t resolved the issue.
Is there anything else I should be looking into or taking care of?