*If I am mapping the result of a subflow over a ta...
# ask-community
d
If I am mapping the result of a subflow over a task in the parent flow, how can I stream in the subflow results for processing as they are ready? Instead of waiting for the entirety of
sub_flow
to complete before processing its results in parallel, like in this example below:
Copy code
from time import sleep
from datetime import datetime
from random import randint
from prefect import flow, task

# sub-flow and sub-flow task
###########################################
@task(log_prints=True)
def sub_flow_task(a_list_element):
    print(f'list_element: {a_list_element}, start time: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}')
    sleep(randint(1, 10))
    print(f'list_element: {a_list_element}, end time: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}')
    return a_list_element


@flow(log_prints=True)
def sub_flow():
    a_list = [1,2,3,4]
    return sub_flow_task.map(a_list_element=a_list)


###########################################
###########################################
# main flow and main flow task
@task(log_prints=True)
def main_task(test):
    print(f'main_task() received work from subflow, val: {test}')


@flow(log_prints=True)
def main_flow():
    subflow_work = sub_flow()
    main_task.map(test=subflow_work)


if __name__ == '__main__':
    main_flow()
n
hi @David Michael Carter - what version of prefect are you using? if you're using the 3.x rc you can use generators here
Copy code
from datetime import datetime
from random import randint
from time import sleep
from typing import Generator

from prefect import flow, task
from prefect.futures import PrefectFuture


# sub-flow and sub-flow task
###########################################
@task
def sub_flow_task(a_list_element: int) -> int:
    print(
        f'list_element: {a_list_element}, start time: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}'
    )
    sleep(randint(1, 10))
    print(
        f'list_element: {a_list_element}, end time: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}'
    )
    return a_list_element


@flow
def sub_flow() -> Generator[PrefectFuture, None, None]:
    a_list = [1, 2, 3, 4]
    for a_list_element in a_list:
        yield sub_flow_task.submit(a_list_element)


###########################################
# main flow and main flow task
@task
def main_task(test: int) -> None:
    print(f"main_task() received work from subflow, val: {test}")


@flow(log_prints=True)
def main_flow() -> None:
    subflow_work = sub_flow()
    [fut.result() for fut in main_task.map(subflow_work)]


if __name__ == "__main__":
    main_flow()
• otherwise with 2.x i might use async +
<http://asyncio.as|asyncio.as>_completed
thank you 1
d
@Nate thanks for the feedback, I think I have found a solution with
async
+
as_completed
(I am using
prefect==2.15.0
)
n
šŸ‘