https://prefect.io logo
Title
b

Ben Muller

12/02/2022, 3:35 AM
strange one here - I was trying to investigate the state object just by playing around with it in a dummy flow, like so:
import time

from prefect import flow, get_run_logger
from prefect.deployments import run_deployment


@flow
def run():
    x = run_deployment(name="dro-gallops-fields/default", idempotency_key=str(int(time.time())), timeout=0)

    while x:
        time.sleep(5)

        get_run_logger().info(x.state)
        get_run_logger().info(x.state_name)
        get_run_logger().info(x.state_type)
        get_run_logger().info(x.state.message)
        get_run_logger().info(x.state.result)


if __name__ == "__main__":
    run()
This does everything I expect in Prefect Cloud but the logs on the parent flow "run" show something like this ( forever - it never exits the Scheduled state and updates in line with its actual state ? ) : While the subflow run has the flow makred as
Complete
- is there a different way to ping the state in prefect 2 ?
1
m

Mason Menges

12/02/2022, 4:39 PM
Hey @Ben Muller when you trigger the run_deployment command it returns the flow run object when the run was triggered with timeout set to 0, i.e. it isn't actually querying the UI to check the new state it's more of like a snapshot of the flow run object at the time it was triggered. I wrote a similar example for this earlier that might help 😄
from prefect import flow, get_run_logger, get_client
from prefect.deployments import run_deployment
import asyncio
import time


@flow
async def test_flow():
    logger = get_run_logger()

    list_inputs = ["grass", "fire"]

    results = []
    for type in list_inputs:
        deployed_run = await run_deployment(
            name="create-pk-team/pk-flow-dev", 
            parameters={"pk_type": type}, 
            flow_run_name=f"{type} create-pk-team/pk-flow-dev", 
            timeout=0)

        results.append(deployed_run.id)


    api_client = get_client()

    while len(results) > 0:
        <http://logger.info|logger.info>("in while loop")
        for id in results:
            <http://logger.info|logger.info>(f"checking flow run id: {id}")
            deployed_run = await api_client.read_flow_run(id)
            run_state = deployed_run.state
            <http://logger.info|logger.info>(f"Run {id} Completed {run_state}")
            if run_state.is_final() == True:
                <http://logger.info|logger.info>("removed result")
                results.remove(id)
                <http://logger.info|logger.info>(results)
        time.sleep(5)

    <http://logger.info|logger.info>("I waited")

if __name__ == "__main__":
    asyncio.run(test_flow())
b

Ben Muller

12/02/2022, 7:21 PM
Thanks @Mason Menges that's super helpful 👐 Are there any plans to make this more of a user user friendly thing like it was in prefect 1? I'm planning on wrapping a task around this whole thing anyway, but thought I'd ask.
m

Mason Menges

12/02/2022, 7:39 PM
I don't know of anything specific personally, not to say that there definitely aren't any plans I just don't know of any upcoming changes around accessing the flow run state. That said I know there have been some conversations around this, it could be worth opening a feature request if you have some specific thoughts on what might be useful here beyond just the api call to update the state object. 😄
b

Ben Muller

12/02/2022, 8:10 PM
It's no big deal. I'll share my wrapper in case anyone else finds it useful. Thanks Mason
Btw, does your code sample need to be async? Or can the same be achieved without?
m

Mason Menges

12/02/2022, 9:25 PM
The same can be achieved without it, that's just so you could run the subflows concurrently
b

Ben Muller

12/02/2022, 9:31 PM
Ah right, I was thinking about wrapping each run deployment in a task for a nicer implementation?