Hi, I have a flow that was registered with a local...
# ask-community
h
Hi, I have a flow that was registered with a local Prefect server. If I invoke
flow.run()
, instead of it running on the registered server with the local agent it is running on my notebook. I could not find on the docs how to trigger the execution of the registered flow using the local server. Could you point me to an example or docs (which I likely missed)? What I am looking for is a way to execute the registered flow on my server via Python.
k
Hey @Hugo Slepicka, a couple of ways. You can do to the UI of server and trigger the Flow Run there. From your notebook, you can use the Prefect
Client
and do
Client.create_flow_run
. There are also two tasks for
StartFlowRun
and
create_flow_run
that can be found here
There are Tasks but tasks can be invoked by calling the
run()
method.
You can also look at how the call was made in the source code if you want.
h
So I would replace my:
Copy code
state = flow.run(executor=executor)
with:
Copy code
from prefect.tasks.prefect.flow_run import StartFlowRun
state = StartFlowRun().run(flow_name="Distgen->Astra&Impact", project_name="lume-orchestration")
?
k
That will work yeah
1
h
@Kevin Kho and how can I monitor it for finished status and fetch results back?
The run succeeded with the StartFlowRun task… now I just have that last question about fetch state while it runs to wait or not for the task to finish and how to fetch results back. Thank you!
k
So that is two separate things. To wait for the state, you can do
wait=True
, for fetching results back, you need to use the GraphQL API. So a while ago I mentioned there is also a
create_flow_run
task. There are three tasks which go together.
create_flow_run
,
wait_for_flow_run
and
get_task_run_result
. All are in the task library.
create_flow_run
and
wait_for_flow_run
were meant to decouple the
StartFlowRun(…,wait=True)
. Basically to fetch results you want the
get_task_run_result
👀 1
And then you want to use 
task.run()
 for these if they are outside of a flow
h
@Kevin Kho what is the task_slug?
k
The slug is the identifier cuz a task can be used multiple times so often there will be a number for multiple invocations. If you click inside the task there in the UI you might find the slug.
Basically the identifier for the task that has the results you want to pull
You can also specify it when you create the task. Docs
h
In the case of this screenshot the
task_slug
would be:
brainy-ocelot
and etc?
k
Yes I think those are the slugs
1
h
Thanks for the help. I ended up going with this route here:
Copy code
from prefect.backend import FlowRunView
flow_run = FlowRunView.from_flow_run_id(state)
all_tasks = flow_run.get_all_task_runs()
g, a, i = [all_tasks[x].get_result() for x in range(3)]
Here is what I was doing: https://github.com/hhslepicka/lume-prefect-demo/blob/main/lume-flow-server.ipynb Let me know if you have suggestions! Thanks again for the guidance.