Dexter Antonio
02/28/2022, 5:27 PMAnna Geller
02/28/2022, 5:40 PMprefect run --name yourflowname --watch
To query for flow runs started with a specific parameter value, you may use this query:
query ($param_value: jsonb) {
flow_run(
where: {_and: [{flow_id: {_eq: "eb45ed0a-a76c-445b-b0b8-1fdc05c1f54c"}},
{parameters: {_eq: $param_value}}]}
) {
name
state
end_time
start_time
parameters
}
}
with your parameter value as query variable:
{
"param_value": {
"x": 1
}
}
Regarding various ways to start a parametrized flow run, check out this blog post which discusses this topic in detail https://www.prefect.io/blog/how-to-make-your-data-pipelines-more-dynamic-using-parameters-in-prefect/
but happy to answer any specific question you may haveDexter Antonio
02/28/2022, 5:52 PMos.system?
Kevin Kho
02/28/2022, 5:54 PMcreate_flow_run
task and pass in the parameters to accomplish 1 and 3 from your initial questions?Dexter Antonio
02/28/2022, 5:56 PMKevin Kho
02/28/2022, 5:56 PMClient.create_flow_run
that is not a task, and a task can be invoked with task.run()
so you can do create_flow_run.run()
to use the Python under the hoodDexter Antonio
02/28/2022, 5:59 PMclient.create_flow_run(id_of_flow)
Anna Geller
02/28/2022, 6:02 PMimport os
import requests
API_KEY = os.environ["API_KEY"]
FLOW_VERSION_GROUP_ID = os.environ["FLOW_VERSION_GROUP_ID"]
create_mutation = """
mutation($input: createFlowRunInput!){
createFlowRun(input: $input){
flow_run{
id
}
}
}
"""
inputs = dict(versionGroupId=FLOW_VERSION_GROUP_ID, parameters=dict(x=6))
response = <http://requests.post|requests.post>(
url="<https://api.prefect.io>",
json=dict(query=create_mutation, variables=dict(input=inputs)),
headers=dict(authorization=f"Bearer {API_KEY}"),
)
print(response.status_code)
print(response.text)
Dexter Antonio
02/28/2022, 6:28 PM