Hey, I have a question about `flow.serve` . I've ...
# ask-community
a
Hey, I have a question about
flow.serve
. I've used it to launch the flow as a long running process and trigger it via the cloud API. Can I also send a request directly to this flow instead of via the cloud API by setting
webserver=True
?
Here is what I ran: script using one of the demo/tutorial flows:
Copy code
from my_gh_workflow import repo_info

if __name__ == "__main__":
    repo_info.serve(
        name="my-first-served-deployment",
        tags=["serve"],
        # parameters={"repo_owner": "PrefectHQ", "repo_name": "prefect"},
        webserver=True
    )
shell:
Copy code
$ export PREFECT_RUNNER_SERVER_HOST="0.0.0.0"
$ export PREFECT_EXPERIMENTAL_ENABLE_EXTRA_RUNNER_ENDPOINTS=true
$ python serve.py
I went to the
/docs
endpoint of the fastapi server that gets launched and it seemed like it's supposed to do exactly what I want. See attached screenshot. In the docs, I can send a request to the endpoint
/flow/run
but there's an internal error. Here is part of the tracceback:
Copy code
File "/Users/alexdashly/.pyenv/versions/prefect-tutorial/lib/python3.12/site-packages/prefect/_vendor/fastapi/routing.py", line 251, in app
    raw_response = await run_endpoint_function(
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/alexdashly/.pyenv/versions/prefect-tutorial/lib/python3.12/site-packages/prefect/_vendor/fastapi/routing.py", line 177, in run_endpoint_function
    return await dependant.call(**values)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/alexdashly/.pyenv/versions/prefect-tutorial/lib/python3.12/site-packages/prefect/runner/server.py", line 204, in _create_flow_run_for_flow_from_fqn
    flow = load_flow_from_entrypoint(body.entrypoint)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/alexdashly/.pyenv/versions/prefect-tutorial/lib/python3.12/site-packages/prefect/flows.py", line 1549, in load_flow_from_entrypoint
    path, func_name = entrypoint.rsplit(":", maxsplit=1)
    ^^^^^^^^^^^^^^^
ValueError: not enough values to unpack (expected 2, got 1)
Am I on the right track and this is just a broken feature or am I totally off base with how this is supposed to be used?
Screenshot of the server's API docs
n
you are on the right track!
export PREFECT_RUNNER_SERVER_HOST="0.0.0.0"
can you tell me why you set this like this?
a
When I ran it without setting that, it was throwing me an error about a 172 ip address.
Copy code
❯ python serve.py
Your flow 'repo-info' is being served and polling for scheduled runs!

To trigger a run for this flow, use the following command:

        $ prefect deployment run 'repo-info/my-first-served-deployment'

You can also run your flow via the Prefect UI: <https://app.prefect.cloud/account/79d72e2f-b8e0-4d90-a57a-bd23007bcdfd/workspace/4d1d1784-a0dc-47df-a615-5c46ac28c80f/deployments/deployment/b528a101-0456-4725-acd9-cf4c7629b5a3>

ERROR:    [Errno 49] error while attempting to bind on address ('172.16.3.204', 8080): can't assign requested address
n
in general what the
/flow/run
webserver endpoint is supposed to allow is running any flow in the import space of a served deployment, whether or not that flow is a "deployment entrypoint" can you say how you're interested in using this endpoint? like where do you want to trigger it from i still need to document this better 🙂 thanks for exploring the experimental feature!
a
can you say how you're interested in using this endpoint? like where do you want to trigger it from
We have a system where a particular app needs to run on a workstation. The task that app needs to run needs to be triggered when a call is made to our main API by a user via our front end. So we want to relay the API call to that app on the workstation. Obviously, we can just do this in a "classic" way. But, for general workflow orchestration I was getting prefect up and running for us and exploring the new features since I last used it (which was also prefect 1). I noticed
flow.serve
and this represented a nice out of the box (but maybe not long term) way to do this. But I noticed this other part to
flow.serve
and was curious if we could cut down on the latency by routing the API call directly to the workstation, getting the nice benefits of prefect workflow tracking etc, and not having to go through the cloud API. Does this all make sense?
@Nate just wanted to follow-up re: that IP address issue as well as the use case. I'm not blocked or anything just curious if that use case makes sense.
n
from that description that use case does make sense to me, i will say: • the API will register the flow run if you hit
/flow/run
, that flow run just wont be directly associated with a deployment • the main reason we did this work was to enable non-blocking submission / concurrent execution of subflow runs. however yeah, if you hit
/flow/run
you're gonna skip one trip to the API and we'll just tell the serve process to run that flow immediately, so it might in fact cut down on latency as far as the IP thing, whats the network relationship going on when you had that error? we originally intended for the webserver to be in the same network space as the serve process, but there's some forwarding stuff you could likely do
a
That all makes sense. I'll be monitoring to see how this goes.
as far as the IP thing, whats the network relationship going on when you had that error? we originally intended for the webserver to be in the same network space as the serve process, but there's some forwarding stuff you could likely do
I was just running on my laptop. got the error both on my home network and office network. Admittedly i have no idea what's happening when running
flow.serve(webserver=True)
.
n
here's what happens bc you say
webserver=True
(i.e. we'll spin up the webserver in a separate thread), which might be TMI 🙂 but in case you're curious if you have some code I could look at I could probably help more concretely
a
here's the basic script I have. I'm on
2.16.0
. It seems i'm no longer getting the ip bind issue. Strange. Not sure why it's different. Didn't change versions.
Copy code
$ PREFECT_EXPERIMENTAL_ENABLE_EXTRA_RUNNER_ENDPOINTS=true python serve.py
I had success triggering the flow with this endpoint:
/deployment/b528a101-0456-4725-acd9-cf4c7629b5a3/run
But
/flow/run
is still throwing an error. I'm just passing the example values from
/docs
. Is it because this endpoint is supposed to trigger a named flow with
entrypoint
and I'm not passing a proper name?
Copy code
{
  "entrypoint": "string",
  "parameters": {},
  "parent_task_run_id": "3fa85f64-5717-4562-b3fc-2c963f66afa6"
}
n
ah so I probably should have clarified this earlier, but we have a method called
submit_to_runner
which will hit
/flow/run
for you if you have the
flow
object handy, see this example and yeah if you want to call it yourself, without the flow object, you'd have to provide the actual flow entrypoint and serialize your parameters etc, where the implementation of
submit_to_runner
might be useful if you want to do that
a
ah, okay, I think I'm getting it more. Interesting. Going to revisit this later this week.
n
sounds good! let me know if you have any trouble