Kiran
03/24/2025, 10:16 AM@flow(log_prints=True,name="highpriority-triggering-flow",flow_run_name="highpriority-triggering-flow-run")
async def highpriority_first_triggeringflow():
with open("calc_file.txt", "r") as file:
expressions = [line.strip() for line in file.readlines()]
flow_names=["first-flow","second-low","third-flow","fourth-flow","fifth-flow","sixth-flow","seventh-flow","eigth-flow","ninth-flow","tenth-flow"]
dictionary=dict(zip(expressions, flow_names))
print(f"the created dictionary is {dictionary}")
flow_tasks=[]
for num,name in dictionary.items():
print(f"the number is {num}")
print(f"the name is {name}")
print(f"triggering flow run {name} successfully")
if int(num) in range(1,4):
print(f"entered first if condition, number is {num}")
flow_tasks.append(run_deployment(
name="main-flow/highpriority-maindeployment", # Deployment Name
parameters={"num": num},
flow_run_name=name,
work_queue_name="high"
#scheduled_time=datetime(2025,03,21,4,30,00)
))
elif int(num) in range(4,7):
print(f"entered second if condition, number is {num}")
flow_tasks.append(run_deployment(
name="main-flow/highpriority-maindeployment", # Deployment Name
parameters={"num": num},
flow_run_name=name,
work_queue_name="medium"
))
elif int(num) in range(7,11):
print(f"entered third if condition, number is {num}")
flow_tasks.append(run_deployment(
name="main-flow/highpriority-maindeployment", # Deployment Name
parameters={"num": num},
flow_run_name=name,
work_queue_name="low"
))
print(f"flow tasks are {flow_tasks}")
await asyncio.gather(*flow_tasks)
print("All flows triggered successfully!")
if __name__=="__main__":
asyncio.run(highpriority_first_triggeringflow())