Selora
02/11/2025, 3:46 PMMarvin
02/11/2025, 3:46 PMsubmit
method to run async tasks within a sync flow. Here's a basic example:
python
from prefect import flow, task
@task
async def my_async_task():
pass
@flow
def my_sync_flow():
async_results = [my_async_task.submit() for _ in range(5)]
for result in async_results:
# Use `result.result()` to block and get the result of the async task
result.result()
my_sync_flow()
In this example, my_async_task
is an asynchronous task that you want to run multiple times. The submit
method is used to schedule these tasks, and result.result()
is used to block and retrieve the results synchronously.
You can find more about task submission and flow execution in the Prefect tasks documentation.
If you have more specific requirements or configurations, feel free to ask!Selora
02/11/2025, 3:50 PMNate
02/11/2025, 3:56 PMas_completed
if you want thatSelora
02/11/2025, 3:59 PMNate
02/11/2025, 4:01 PMfrom prefect.futures
https://github.com/prefecthq/prefect/blob/main/src/prefect/futures.py#L343-L381
so the resulting list of futures from map (or list comp of submits) can be provided to that, and then you can process them as they finishSelora
02/11/2025, 4:02 PMSelora
02/11/2025, 4:30 PMgot Future <Future pending> attached to a different loop
but only on task failures, from the .map().result() operation.
It looks like prefect manages the async loops and getting the future results in certain cases is problematic?Nate
02/11/2025, 4:30 PMSelora
02/11/2025, 4:32 PMSelora
02/11/2025, 5:25 PMaiodns.Resolver(loop=get_current_loop())
async get_resovler()->aiodns.Resolver:
pass
@task
async def task(query:str):
await get_resolver.query(query)
Selora
02/11/2025, 5:28 PM