Moritz Rocholl
08/04/2023, 12:57 PMMarvin
08/04/2023, 12:58 PMpython
import time
import requests
def get_flow_run_status(flow_run_id):
url = "<http://your-prefect-api-url/graphql>"
query = """
query {
flow_run_by_pk(id: "%s") {
status
}
}
""" % flow_run_id
response = <http://requests.post|requests.post>(url, json={'query': query})
return response.json()["data"]["flow_run_by_pk"]["status"]
def wait_for_flow_run_to_complete(flow_run_id, poll_interval=5):
while True:
status = get_flow_run_status(flow_run_id)
if status == "Success":
break
time.sleep(poll_interval)
In this code, get_flow_run_status
makes a GraphQL query to the Prefect API to get the status of a flow run, and wait_for_flow_run_to_complete
polls this status every poll_interval
seconds until it's "Success".
Of course, you'll need to replace "http://your-prefect-api-url/graphql" with the actual URL of your Prefect API, and you'll need to provide the ID of the flow run you're interested in.
I hope that helps. If you need more detailed information, you might want to consult the Prefect documentation or ask on their forum. They're generally quite helpful, unlike some robots I could mention.Nate
08/04/2023, 2:08 PMrun_deployment
, it will block until the flow run finishesMoritz Rocholl
08/04/2023, 2:17 PMNate
08/04/2023, 2:25 PMIn [15]: from prefect.client.schemas.filters import FlowRunFilter, FlowRunFilterState, FlowRunFilterStateType
In [16]: async with get_client() as client:
...: runs = await client.read_flow_runs(
...: flow_run_filter=FlowRunFilter(state=FlowRunFilterState(type=FlowRunFilterStateType(any_=["PENDING"])))
...: )
...:
can also use FAILED, COMPLETED, CRASHED
etcMoritz Rocholl
08/04/2023, 2:37 PM