Naren K
01/19/2024, 12:39 PMfrom prefect import flow
import asyncio
from prefect.deployments import run_deployment
from prefect.logging import get_logger
import time
logger = get_logger("example")
@flow(name="subflow")
def subflow():
for i in range(1, 10):
logger.info(i)
logger.info("Sleeping for 10 second...")
time.sleep(10)
logger.info("Awake now!")
return "Hello from subflow!"
@flow(name="subflow1")
def subflow1():
for i in range(1, 20):
logger.info(i)
logger.info("Sleeping for 20 second...")
time.sleep(20)
logger.info("Awake now!")
return "Hello from subflow1!"
@flow(name="subflow2")
def subflow2():
logger.info("Sleeping for 30 second...")
time.sleep(30)
logger.info("Awake now!")
return "Hello from subflow2!"
@flow(name="subflow3")
def subflow3():
for i in range(1, 20):
logger.info(i)
logger.info("Sleeping for 40 second...")
time.sleep(40)
logger.info("Awake now!")
return "Hello from subflow3!"
@flow(name="subflow4")
def subflow4():
for i in range(1, 20):
logger.info(i)
logger.info("Sleeping for 50 second...")
time.sleep(50)
logger.info("Awake now!")
return "Hello from subflow4!"
@flow(name="subflow5")
def subflow5():
for i in range(1, 20):
logger.info(i)
logger.info("Sleeping for 60 second...")
time.sleep(60)
logger.info("Awake now!")
return "Hello from subflow5!"
@flow(name="subflow6")
def subflow6():
for i in range(1, 20):
logger.info(i)
logger.info("Sleeping for 70 second...")
time.sleep(70)
logger.info("Awake now!")
return "Hello from subflow6!"
@flow(name="subflow7")
def subflow7():
for i in range(1, 20):
logger.info(i)
logger.info("Sleeping for 80 second...")
time.sleep(80)
logger.info("Awake now!")
return "Hello from subflow7!"
@flow
async def master():
initial_run = await asyncio.gather(
*[
run_deployment(name="subflow/subflow"),
]
)
first_run = await asyncio.gather(
*[
run_deployment(name="subflow1/subflow1"),
run_deployment(name="subflow2/subflow2")
]
)
logger.info(f"First round result: {first_run}")
second_run = await asyncio.gather(
*[
run_deployment(name="subflow3/subflow3"),
]
)
logger.info(f"First round result: {second_run}")
third_run = await asyncio.gather(
*[
run_deployment(name="subflow4/subflow4"),
]
)
logger.info(f"First round result: {third_run}")
third_run = await asyncio.gather(
*[
run_deployment(name="subflow5/subflow5"),
run_deployment(name="subflow6/subflow6"),
]
)
logger.info(f"First round result: {third_run}")
fourth_run = await asyncio.gather(
*[
run_deployment(name="subflow7/subflow7")
]
)
logger.info(f"First round result: {fourth_run}")