Ben Muller
12/02/2022, 3:35 AMimport time
from prefect import flow, get_run_logger
from prefect.deployments import run_deployment
@flow
def run():
x = run_deployment(name="dro-gallops-fields/default", idempotency_key=str(int(time.time())), timeout=0)
while x:
time.sleep(5)
get_run_logger().info(x.state)
get_run_logger().info(x.state_name)
get_run_logger().info(x.state_type)
get_run_logger().info(x.state.message)
get_run_logger().info(x.state.result)
if __name__ == "__main__":
run()
This does everything I expect in Prefect Cloud but the logs on the parent flow "run" show something like this ( forever - it never exits the Scheduled state and updates in line with its actual state ? ) :
While the subflow run has the flow makred as Complete
- is there a different way to ping the state in prefect 2 ?Mason Menges
12/02/2022, 4:39 PMfrom prefect import flow, get_run_logger, get_client
from prefect.deployments import run_deployment
import asyncio
import time
@flow
async def test_flow():
logger = get_run_logger()
list_inputs = ["grass", "fire"]
results = []
for type in list_inputs:
deployed_run = await run_deployment(
name="create-pk-team/pk-flow-dev",
parameters={"pk_type": type},
flow_run_name=f"{type} create-pk-team/pk-flow-dev",
timeout=0)
results.append(deployed_run.id)
api_client = get_client()
while len(results) > 0:
<http://logger.info|logger.info>("in while loop")
for id in results:
<http://logger.info|logger.info>(f"checking flow run id: {id}")
deployed_run = await api_client.read_flow_run(id)
run_state = deployed_run.state
<http://logger.info|logger.info>(f"Run {id} Completed {run_state}")
if run_state.is_final() == True:
<http://logger.info|logger.info>("removed result")
results.remove(id)
<http://logger.info|logger.info>(results)
time.sleep(5)
<http://logger.info|logger.info>("I waited")
if __name__ == "__main__":
asyncio.run(test_flow())
Ben Muller
12/02/2022, 7:21 PMMason Menges
12/02/2022, 7:39 PMBen Muller
12/02/2022, 8:10 PMMason Menges
12/02/2022, 9:25 PMBen Muller
12/02/2022, 9:31 PM