David Michael Carter
06/17/2024, 1:44 PMsub_flow
to complete before processing its results in parallel, like in this example below:
from time import sleep
from datetime import datetime
from random import randint
from prefect import flow, task
# sub-flow and sub-flow task
###########################################
@task(log_prints=True)
def sub_flow_task(a_list_element):
print(f'list_element: {a_list_element}, start time: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}')
sleep(randint(1, 10))
print(f'list_element: {a_list_element}, end time: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}')
return a_list_element
@flow(log_prints=True)
def sub_flow():
a_list = [1,2,3,4]
return sub_flow_task.map(a_list_element=a_list)
###########################################
###########################################
# main flow and main flow task
@task(log_prints=True)
def main_task(test):
print(f'main_task() received work from subflow, val: {test}')
@flow(log_prints=True)
def main_flow():
subflow_work = sub_flow()
main_task.map(test=subflow_work)
if __name__ == '__main__':
main_flow()
Nate
06/17/2024, 2:46 PMfrom datetime import datetime
from random import randint
from time import sleep
from typing import Generator
from prefect import flow, task
from prefect.futures import PrefectFuture
# sub-flow and sub-flow task
###########################################
@task
def sub_flow_task(a_list_element: int) -> int:
print(
f'list_element: {a_list_element}, start time: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}'
)
sleep(randint(1, 10))
print(
f'list_element: {a_list_element}, end time: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}'
)
return a_list_element
@flow
def sub_flow() -> Generator[PrefectFuture, None, None]:
a_list = [1, 2, 3, 4]
for a_list_element in a_list:
yield sub_flow_task.submit(a_list_element)
###########################################
# main flow and main flow task
@task
def main_task(test: int) -> None:
print(f"main_task() received work from subflow, val: {test}")
@flow(log_prints=True)
def main_flow() -> None:
subflow_work = sub_flow()
[fut.result() for fut in main_task.map(subflow_work)]
if __name__ == "__main__":
main_flow()
⢠otherwise with 2.x i might use async + <http://asyncio.as|asyncio.as>_completed
David Michael Carter
06/17/2024, 3:07 PMasync
+ as_completed
(I am using prefect==2.15.0
)Nate
06/17/2024, 3:07 PM