Tarun Amasa
09/06/2024, 5:33 PMChris White
Pratham
09/06/2024, 5:50 PMPratham
09/08/2024, 5:16 AMrun_coro_as_sync
instead just slower (the blocking portal iirc opportunistically makes new threads)Pratham
09/08/2024, 5:16 AMPratham
09/08/2024, 5:20 AMimport asyncio
from endex.prefect.executors.AsyncEnabledTaskRunner import AsyncEnabledTaskRunner
from prefect import task
from prefect.flows import flow
from prefect.futures import wait
@task(task_run_name="nested_task-{batch_index}")
async def nested_task(batch_index: int):
"""
A nested task that is called by the parent task.
"""
await asyncio.sleep(batch_index)
return batch_index
@task(name="parent_task")
async def parent_task():
"""
A task that calls a nested task.
"""
n_items = 5
# Call the nested task in parallel
tasks = []
for i in range(n_items):
tasks.append(nested_task.submit(batch_index=i))
# Wait for all tasks to complete
wait(tasks)
return sum([task.result() for task in tasks])
@task(name="parent_task-2")
async def parent_task_2():
"""
A task that calls a nested task.
"""
n_items = 10
# Call the nested task in parallel
tasks = []
for i in range(n_items):
tasks.append(nested_task.submit(batch_index=i))
# Wait for all tasks to complete
wait(tasks)
return sum([task.result() for task in tasks])
@flow(name="nested_tasks", task_runner=AsyncEnabledTaskRunner(max_workers=10))
def nested_tasks_flow():
"""
A flow that calls a parent task.
"""
task = parent_task.submit()
task_2 = parent_task_2.submit()
return task.result(), task_2.result()
if __name__ == "__main__":
flow = nested_tasks_flow()
import asyncio
asyncio.run(flow)
the wait in the first parent is blocking correct? this is what im seeing in my logsChris White
wait
is a blocking call - we have plans to support async waits (awaits ha) soon though; if your AsyncEnabledTaskRunner
handles async better than ours, feel free to open either a PR or an issue describing the changes and we can consider incorporating it into the codebase!!Pratham
09/09/2024, 3:51 AM