so can you not invoke tasks within other tasks?
# prefect-cloud
t
so can you not invoke tasks within other tasks?
c
2.x does not allow this, but 3.0+ does!
p
seeing issues with this when trying to submit an array of tasks and then waiting on the list, where the parent task isn't completed. can make a MRE shortly
🙏 2
ok figured out the issue, i made a custom task runner using anyio blocking portal that interfers with the logging somehow. btw the default threadpool executor breakds under normal async usage because of event_loop shenanigans. works fine if i use your guys'
run_coro_as_sync
instead just slower (the blocking portal iirc opportunistically makes new threads)
😅
also, if i have a pattern like this:
Copy code
import asyncio

from endex.prefect.executors.AsyncEnabledTaskRunner import AsyncEnabledTaskRunner
from prefect import task
from prefect.flows import flow
from prefect.futures import wait


@task(task_run_name="nested_task-{batch_index}")
async def nested_task(batch_index: int):
    """
    A nested task that is called by the parent task.
    """
    await asyncio.sleep(batch_index)
    return batch_index


@task(name="parent_task")
async def parent_task():
    """
    A task that calls a nested task.
    """
    n_items = 5

    # Call the nested task in parallel
    tasks = []
    for i in range(n_items):
        tasks.append(nested_task.submit(batch_index=i))

    # Wait for all tasks to complete
    wait(tasks)

    return sum([task.result() for task in tasks])

@task(name="parent_task-2")
async def parent_task_2():
    """
    A task that calls a nested task.
    """
    n_items = 10

    # Call the nested task in parallel
    tasks = []
    for i in range(n_items):
        tasks.append(nested_task.submit(batch_index=i))

    # Wait for all tasks to complete
    wait(tasks)

    return sum([task.result() for task in tasks])


@flow(name="nested_tasks", task_runner=AsyncEnabledTaskRunner(max_workers=10))
def nested_tasks_flow():
    """
    A flow that calls a parent task.
    """
    task = parent_task.submit()
    task_2 = parent_task_2.submit()


    return task.result(), task_2.result()


if __name__ == "__main__":
    flow = nested_tasks_flow()
    import asyncio

    asyncio.run(flow)
the wait in the first parent is blocking correct? this is what im seeing in my logs
c
yea right now
wait
is a blocking call - we have plans to support async waits (awaits ha) soon though; if your
AsyncEnabledTaskRunner
handles async better than ours, feel free to open either a PR or an issue describing the changes and we can consider incorporating it into the codebase!!
p
appreciate it thanks Chris. I think there's some performance gains i can make out perhaps but will test over the next couple of days. We're kinda more blocked by getting our worker pool set up 😅