I started using prefect cloud to manage my workflo...
# prefect-community
d
I started using prefect cloud to manage my workflows. I like the UI but I would like to be able to perform prefect cloud tasks with python code. Specifically I am attempting to do the following three things: 1. Start a flow, which I registered on prefect cloud with Python code 2. Fetch the parameters used to run a flow from prefect cloud with Python code/GraphCL 3. Change the parameters and then rerun the flow The documentation on how to achieve these tasks is a little sparse, but I think it is possible. How can I go about achieving these goals?
a
To start a registered flow run, you can use the Prefect CLI e.g.
Copy code
prefect run --name yourflowname --watch
To query for flow runs started with a specific parameter value, you may use this query:
Copy code
query ($param_value: jsonb) {
  flow_run(
    where: {_and: [{flow_id: {_eq: "eb45ed0a-a76c-445b-b0b8-1fdc05c1f54c"}}, 
      {parameters: {_eq: $param_value}}]}
  ) {
    name
    state
    end_time
    start_time
    parameters
  }
}
with your parameter value as query variable:
Copy code
{
  "param_value": {
    "x": 1
  }
}
Regarding various ways to start a parametrized flow run, check out this blog post which discusses this topic in detail https://www.prefect.io/blog/how-to-make-your-data-pipelines-more-dynamic-using-parameters-in-prefect/ but happy to answer any specific question you may have
d
thanks, I will try this out! Is there a more natural way to use the CLI with python, or should I just execute the commands with
os.system?
k
Am I missing something? Or can you use the
create_flow_run
task and pass in the parameters to accomplish 1 and 3 from your initial questions?
d
I think I have to call create_flow_run within a Flow context block.
I guess I could create a local flow that runs remote flows. I will try that.
k
There is
Client.create_flow_run
that is not a task, and a task can be invoked with
task.run()
so you can do
create_flow_run.run()
to use the Python under the hood
d
thanks Kevin, that works great. I am able to start flows with
client.create_flow_run(id_of_flow)
a
the blog post I mentioned shows that actually 🙂 there is even an example how you can start a parametrized flow run using Python request library
Copy code
import os
import requests

API_KEY = os.environ["API_KEY"]
FLOW_VERSION_GROUP_ID = os.environ["FLOW_VERSION_GROUP_ID"]

create_mutation = """
mutation($input: createFlowRunInput!){
    createFlowRun(input: $input){
        flow_run{
            id
        }
    }
}
"""

inputs = dict(versionGroupId=FLOW_VERSION_GROUP_ID, parameters=dict(x=6))
response = <http://requests.post|requests.post>(
    url="<https://api.prefect.io>",
    json=dict(query=create_mutation, variables=dict(input=inputs)),
    headers=dict(authorization=f"Bearer {API_KEY}"),
)
print(response.status_code)
print(response.text)
d
Thanks this is also a cool way to do it.
👍 1