Mac
11/16/2022, 4:29 PMcreate_flow_run.map()
from prefect v1 in v2? I am trying to run a few subflows at once, then have the flow wait to get successful responses from all of the subflow before moving on the the next taskMason Menges
11/16/2022, 10:19 PMMac
11/16/2022, 10:43 PMMason Menges
11/16/2022, 10:50 PMMac
11/16/2022, 10:51 PMMason Menges
11/22/2022, 5:45 PMMac
11/22/2022, 5:46 PMMason Menges
11/23/2022, 1:43 AMfrom prefect import flow, get_run_logger, get_client
from prefect.deployments import run_deployment
import asyncio
import time
@flow
async def test_flow():
logger = get_run_logger()
list_inputs = ["grass", "fire"]
results = []
for type in list_inputs:
deployed_run = await run_deployment(
name="create-pk-team/pk-flow-dev",
parameters={"pk_type": type},
flow_run_name=f"{type} create-pk-team/pk-flow-dev",
timeout=0)
results.append(deployed_run.id)
api_client = get_client()
while len(results) > 0:
<http://logger.info|logger.info>("in while loop")
for id in results:
<http://logger.info|logger.info>(f"checking flow run id: {id}")
deployed_run = await api_client.read_flow_run(id)
run_state = deployed_run.state
<http://logger.info|logger.info>(f"Run {id} Completed {run_state}")
if run_state.is_final() == True:
<http://logger.info|logger.info>("removed result")
results.remove(id)
<http://logger.info|logger.info>(results)
time.sleep(5)
<http://logger.info|logger.info>("I waited")
if __name__ == "__main__":
asyncio.run(test_flow())
Mac
11/23/2022, 2:13 PM