<@ULVA73B9P> does this code work?from prefect i...
# ask-community
k
@Marvin does this code work?from prefect import get_client from prefect.deployments import run_deployment import asyncio async def trigger_flow(): # Trigger deployment flow_run = await run_deployment( name="my_flow/my_deployment", # flow/deployment parameters={"client": "S0011I"} # your params ) print("Flow run triggered:", flow_run.id) return flow_run.id async def check_flow_once(flow_run_id): async with get_client() as client: flow_run = await client.read_flow_run(flow_run_id) print("Flow run state:", flow_run.state_name) if flow_run.state_type.value == "CRASHED": print("Flow crashed — you can retry now.") elif flow_run.state_type.value == "COMPLETED": print("Flow completed successfully.") else: print("Flow not in a final state yet.") async def main(): # Step 1: Trigger flow_run_id = await trigger_flow() # --- At this point you could stop the script --- # Later, when you want to check: await check_flow_once(flow_run_id) asyncio.run(main())
m
thought for 100.7 seconds
Short answer: yes — in Prefect 3.x this is fine, assuming: - The deployment name exists (
my_flow/my_deployment
) - You’re logged in/pointed at an API - There’s a worker on the deployment’s work pool to actually pick it up A few small improvements: - Compare against the enum instead of strings to avoid typos - Be defensive in case
state_type
is None - Consider handling other terminal states like FAILED/CANCELLED Here’s a slightly tweaked version:
Copy code
from prefect import get_client
from prefect.deployments import run_deployment
from prefect.client.schemas import StateType
import asyncio

async def trigger_flow():
    flow_run = await run_deployment(
        name="my_flow/my_deployment",
        parameters={"client": "S0011I"}
    )
    print("Flow run triggered:", flow_run.id)
    return flow_run.id

async def check_flow_once(flow_run_id):
    async with get_client() as client:
        flow_run = await client.read_flow_run(flow_run_id)
        print("Flow run state:", flow_run.state_name)

        st = flow_run.state_type
        if st == StateType.CRASHED:
            print("Flow crashed — you can retry now.")
        elif st == StateType.COMPLETED:
            print("Flow completed successfully.")
        elif st in {StateType.FAILED, StateType.CANCELLED}:
            print(f"Flow ended in {st.value}.")
        else:
            print("Flow not in a final state yet.")

async def main():
    flow_run_id = await trigger_flow()
    await check_flow_once(flow_run_id)

asyncio.run(main())
If you run this and only see “not in a final state yet,” your worker probably isn’t running. Yes, they’re like cats — they do nothing unless you watch them.
k
@Marvin flow_run.state_type.value == "CRASHED" idsthis right way to check teh state_type
m
thought for 17.9 seconds
Short answer: it works, but it’s not the nicest way. Better options in Prefect 3.x: - Prefer the enum:
flow_run.state_type == StateType.CRASHED
- If you insist on strings, either compare directly (since it’s a str-enum) or guard for None: -
flow_run.state_type == "CRASHED"
-
flow_run.state_type and flow_run.state_type.value == "CRASHED"
Minimal, safe version:
Copy code
from prefect.client.schemas import StateType

st = flow_run.state_type
if st == StateType.CRASHED:
    ...
Also consider other terminal states: COMPLETED, FAILED, CANCELLED. The universe, sadly, contains more than just “CRASHED”.
k
@Marvin do i have to wrap it in strings
m
thought for 11.4 seconds
No. You don’t have to (and shouldn’t) wrap it in strings if you use the enum. Best practice:
Copy code
from prefect.client.schemas import StateType

if flow_run.state_type == StateType.CRASHED:
    ...
Why this is better: - No quotes needed; you’re comparing enum to enum. - Safe if
state_type
is None —
None == StateType.CRASHED
is just False. - Avoids brittle string typos. If you insist on using
.value
, then yes, you’d compare to the string — but guard for None:
Copy code
if flow_run.state_type and flow_run.state_type.value == "CRASHED":
    ...