Thomas Hoeck
06/21/2021, 11:00 AMRob Fowler
06/21/2021, 11:31 AM*def create_prefect_flow*(prefect_client, name, parameters=None, run_name=None, project=None, version=None):
where_clause = {
"_and": {
"name": {"_eq": name},
"version": {"_eq": version},
"project": {"name": {"_eq": project}},
}
}
query = {
"query": {
with_args(
"flow",
{
"where": where_clause,
"order_by": {
"name": EnumValue("asc"),
"version": EnumValue("desc"),
},
"distinct_on": EnumValue("name"),
},
): {"id": True}
}
}
result = prefect_client.graphql(query)
if
*not* result.data.flow:
*raise* FlowExcept(f"flow name {name} not found")
flow_id = result.data.flow[0].id
*return* flow_id, prefect_client.create_flow_run(flow_id=flow_id,
run_name=run_name,
parameters=parameters)Rob Fowler
06/21/2021, 11:31 AMKevin Kho
prefect.client.client.Client.create_flow_run
. It will return the ID. prefect.tasks.prefect.flow_run.StartFlowRun.run
also returns the ID if wait=False. If wait=True, you would need to write some extra code to get the IDThomas Hoeck
06/22/2021, 7:34 AMKevin Kho
import requests
import json
query = """mutation {
create_flow_run(input: {
flow_id: "5ac80adc-442a-4e53-bc01-d9d65bffd6aa"}) {
id
}
}"""
url = '<https://api.prefect.io>'
r = <http://requests.post|requests.post>(url, json={'query': query}, headers={"authorization": "Bearer insert_token"})
print(r.status_code)
print(r.text)
Thomas Hoeck
06/23/2021, 10:30 AM