datamongus
12/05/2022, 12:16 AMMason Menges
12/05/2022, 7:23 PMfrom prefect import flow, get_run_logger, get_client
from prefect.deployments import run_deployment
import asyncio
import time
@flow
async def test_flow():
logger = get_run_logger()
list_inputs = ["grass", "fire"]
results = []
for type in list_inputs:
deployed_run = await run_deployment(
name="create-pk-team/pk-flow-dev",
parameters={"pk_type": type},
flow_run_name=f"{type} create-pk-team/pk-flow-dev",
timeout=0)
results.append(deployed_run.id)
api_client = get_client()
while len(results) > 0:
<http://logger.info|logger.info>("in while loop")
for id in results:
<http://logger.info|logger.info>(f"checking flow run id: {id}")
deployed_run = await api_client.read_flow_run(id)
run_state = deployed_run.state
<http://logger.info|logger.info>(f"Run {id} Completed {run_state}")
if run_state.is_final() == True:
<http://logger.info|logger.info>("removed result")
results.remove(id)
<http://logger.info|logger.info>(results)
time.sleep(5)
<http://logger.info|logger.info>("I waited")
if __name__ == "__main__":
asyncio.run(test_flow())
TLDR: This is likely doable but the exact process around how you would manage varies depending on your use case, though the run_deployment command would be a useful tool for this kind of use case as it allows you to run deployments from a parent flow with different arguments as needed.