I'm trying to start a prefect orion flow from a fl...
# prefect-community
v
I'm trying to start a prefect orion flow from a flask route handler but I have an error when calling the
flow
decorated function "MissingContextError: No settings context found" What am I missing ?
1
a
can you move all code blocks here into the thread?
v
Copy code
from flask import request
from flask_api import FlaskAPI
from prefect import flow, get_run_logger

application = FlaskAPI(__name__)

@flow(name="prefect_2_http_server")
def process_request(data):
    logger = get_run_logger()
    <http://logger.info|logger.info>("new prefect_2_http_server flow : {]".format(data))

@application.route('/event', methods=['POST'])
def push_event():
    process_request(request.data)
    return {
        "ok": True
    }

if __name__ == '__main__':
    application.run(host='0.0.0.0', debug=True, port=8080)
Copy code
13:59:23.491 | INFO    | werkzeug - 127.0.0.1 - - [20/May/2022 13:59:23] "POST /event HTTP/1.1" 500 -
Traceback (most recent call last):
  File "/home/val/dev/prefect-poc/venv/lib/python3.8/site-packages/flask/app.py", line 2095, in __call__
    return self.wsgi_app(environ, start_response)
  File "/home/val/dev/prefect-poc/venv/lib/python3.8/site-packages/flask/app.py", line 2080, in wsgi_app
    response = self.handle_exception(e)
  File "/home/val/dev/prefect-poc/venv/lib/python3.8/site-packages/flask/app.py", line 2077, in wsgi_app
    response = self.full_dispatch_request()
  File "/home/val/dev/prefect-poc/venv/lib/python3.8/site-packages/flask/app.py", line 1525, in full_dispatch_request
    rv = self.handle_user_exception(e)
  File "/home/val/dev/prefect-poc/venv/lib/python3.8/site-packages/flask_api/app.py", line 106, in handle_user_exception
    raise e
  File "/home/val/dev/prefect-poc/venv/lib/python3.8/site-packages/flask/app.py", line 1523, in full_dispatch_request
    rv = self.dispatch_request()
  File "/home/val/dev/prefect-poc/venv/lib/python3.8/site-packages/flask/app.py", line 1509, in dispatch_request
    return self.ensure_sync(self.view_functions[rule.endpoint])(**req.view_args)
  File "/home/val/dev/prefect-poc/src/flows/prefect_2_http_server_min.py", line 14, in push_event
    process_request(request.data)
  File "/home/val/dev/prefect-poc/venv/lib/python3.8/site-packages/prefect/flows.py", line 319, in __call__
    return enter_flow_run_engine_from_flow_call(self, parameters)
  File "/home/val/dev/prefect-poc/venv/lib/python3.8/site-packages/prefect/engine.py", line 110, in enter_flow_run_engine_from_flow_call
    return anyio.run(begin_run)
  File "/home/val/dev/prefect-poc/venv/lib/python3.8/site-packages/anyio/_core/_eventloop.py", line 70, in run
    return asynclib.run(func, *args, **backend_options)
  File "/home/val/dev/prefect-poc/venv/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 292, in run
    return native_run(wrapper(), debug=debug)
  File "/home/val/.pyenv/versions/3.8.12/lib/python3.8/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/home/val/.pyenv/versions/3.8.12/lib/python3.8/asyncio/base_events.py", line 616, in run_until_complete
    return future.result()
  File "/home/val/dev/prefect-poc/venv/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 287, in wrapper
    return await func(*args)
  File "/home/val/dev/prefect-poc/venv/lib/python3.8/site-packages/prefect/client.py", line 91, in with_injected_client
    client_context = get_client()
  File "/home/val/dev/prefect-poc/venv/lib/python3.8/site-packages/prefect/client.py", line 101, in get_client
    ctx = prefect.context.get_settings_context()
  File "/home/val/dev/prefect-poc/venv/lib/python3.8/site-packages/prefect/context.py", line 230, in get_settings_context
    raise MissingContextError("No settings context found.")
prefect.exceptions.MissingContextError: No settings context found.
a
is the terminal from which you run it authenticated with your Cloud 2.0 account? you can check that using
prefect config view
also: what's your use case here? yesterday you were running real-time streaming jobs with Prefect 2.0 and today you are building Flask apps on top 😄 you are really pushing the envelope
(I mean it in a positive way @Valentin Baert)
a
@Valentin Baert did you get this working? @Anna Geller I also have this use case of wanting to dynamically parameterize workflows on an api request and have prefect run them.
The parameters would get parsed at the route handler, and then the handler would stitch together tasks based on the parameters inside of the request. This would generate flows/sub flows on api call
a
you should be able to do that already using this approach:
Copy code
import asyncio
from prefect.client import get_client


async def main():
    async with get_client() as client:
        depl_id = "074db2e5-229a-460e-85ad-fca31b379fd2"
        response = await client.create_flow_run_from_deployment(depl_id)
        print(response)


if __name__ == "__main__":
    asyncio.run(main())
here are more details
👍 1
Copy code
async def create_flow_run_from_deployment(
        self,
        deployment_id: UUID,
        *,
        parameters: Dict[str, Any] = None,
        context: dict = None,
        state: schemas.states.State = None,
        flow_runner: "FlowRunner" = None,
    ) -> schemas.core.FlowRun:
        """
        Create a flow run for a deployment.

        Args:
            deployment: The deployment model to create the flow run from
            parameters: Parameter overrides for this flow run. Merged with the
                deployment defaults
            context: Optional run context data
            state: The initial state for the run. If not provided, defaults to
                `Scheduled` for now. Should always be a `Scheduled` type.
            flow_runner: An optional flow runnner to use to execute this flow run.

        Raises:
            httpx.RequestError: if Orion does not successfully create a run for any reason

        Returns:
            The flow run model
        """
docstring
a
Does this work on self hosted?
a
yes 100%
a
Amazing
a
as long as your prefect config points at your self-hosted PREFECT_API_URL you are good to go
a
The only issue is the deployment id
a
you can set it using:
Copy code
prefect config set PREFECT_API_URL=your_url
🙏 1
👀 1
a
The flow might not even exist until the api call
a
yes, the flow must be deployed first, but the parameters can be overridden at runtime
if you want to do it without deployment you can just call the decorated flow function and this will work too
if your prefect config points at your PREFECT_API_URL, then running/calling a function decorated with
@flow
will make the flow run visible in the Orion UI
v
Hello, currently my use-case it to make a POC of different use-cases provided to me to build a toolbox for a future team that deals with API and data integration One of the use case is the ability to trigger a flow from an API call, so that's why I'm testing this. Currently I still have the issue mentionned earlier : MissingContextError: No settings context found Do you know what does it mean ? I checked
prefect config view
, everything looks right :
a
Did you try the code I shared via Discourse? Otherwise, can you share the code that gave you that error?
v
No because I was trying to do it without a deployment
The error is triggered when I send an HTTP POST request to localhost:8080 with a dummy json body
This is weird because I don't understand why I was able to trigger a flow from a regular python script implementing a kafka listener but not from a flask route handler Is there some special behavior I'm not aware of that would prevent triggering a flow without a deployment from a Flask route handler ?
a
if you try calling the flow without a deployment, you would need to make sure that the instance/machine running flask app is authenticated with your Cloud account to be able to call flows this way
v
To authenticate the flask app, is not enough to have the environment variables set ?
This is what I did earlier with my kafka POC ant it worked
But here, it doesn't seem to
a
do you want to commit your sample code to your Gitlab repo so I can check how you use it in the Flask app context?
in theory correct, setting the env variables should be enough, not sure why your Flask app has difficulty finding those
btw why do you think it's necessary to have a flask app to call your flows from? this is pretty much the functionality of the Orion API server - I worry that you recreate functionality that already exists in Orion
About the use-case this is because we want at some point to allow a user to trigger a workflow but of course we won't give that user a prefect api key 😛 It will be secured and enriched with other stuff and have to pass trough our api manager
My example here is minimalistic and does not represent the full use-case, I'm just testing the essential bits to clearly document to my team what to use, when and where
a
I see - I still struggle a bit to understand why you couldn't accomplish this using Orion API and why you believe that building an extra API on top is required in your use case - Cloud 2.0 will have RBAC functionality later this year. It might be better to discuss your use case with someone from sales@prefect.io before you end up building an orchestrator on top of an orchestrator - do you know what I mean?
and thanks for sharing the code, I'll have a look
v
I don't think we are building an orchestrator on top of an orchestrator. For example a use case for us in the future would be : • Having a well-defined and documented API for integrations between us and our partners • When we receive an API call on some endpoint it would need to process the data in multiple steps in various systems. • If we use a flow and tasks for each steps of the process we would be able to gain visibility in the steps involved. • We can't tell the partner to call the orion API directly because we would need to give him an API key etc and manage permissions or role inside prefect. We want to keep the orchestrator as an internal tool only and hide the internal from the partner to provide him a user-friendly business oriented API. I don't understand what's not right in this use-case sometimes our integrations flows will be triggered when receiving an event from kafka (this was the use-case POC I tried earlier and it works), sometimes this is from an API call (current POC), some other time it would be a recurrent task that goes and fetch data from a FTP server (in that case I tried with a scheduled deployment and it is fine)
a
thanks for explaining the use case @Valentin Baert - those are all totally valid use cases for Prefect. At a first glance, I don't understand why you get the context errors and I asked the team if they have an idea I'd recommend perhaps trying a new virtual environment with Prefect, ensuring the API key and context is set correctly and they trying to install flask and run your flask app on top and see if that helps?
z
What version of AnyIO do you have?
🙏 1
v
Hi, I have no direct dependency on anyio, it only comes from prefect Here what I see using
Copy code
$ pipdeptree --reverse --packages anyio
Warning!!! Possibly conflicting dependencies found:
* grpcio-status==1.46.1
 - grpcio [required: >=1.46.1, installed: 1.43.0]
------------------------------------------------------------------------
anyio==3.6.1
  - httpcore==0.14.7 [requires: anyio==3.*]
    - httpx==0.22.0 [requires: httpcore>=0.14.5,<0.15.0]
      - prefect==2.0b4 [requires: httpx>=0.22]
  - prefect==2.0b4 [requires: anyio>=3.4.0]
  - starlette==0.19.1 [requires: anyio>=3.4.0,<5]
    - fastapi==0.78.0 [requires: starlette==0.19.1]
      - prefect==2.0b4 [requires: fastapi>=0.70]
or with pip :
Copy code
$ pip freeze | grep anyio
anyio==3.6.1
Copy code
python --version
Python 3.8.12
I also tried to recreate a clean venv but the issue remains Python version 3.8.12 requirements :
Copy code
prefect==2.0b4
Flask-API
Also tried with python 3.10.4 (latest as of now) and a clean venv, the issue is the same
I saw that there was a new beta of prefect 2.0b5 released on May 18th, same issue
Also tried to force anyio==3.4.0 since this was the minimum requirement by prefect but it doesn't change anything
a
thanks Valentin, let's see if @Zanie has any recommendations but I tried to reproduce and there must be some Flask weirdness here. Running your flow with a Flask app causes exactly the same error for me:
Copy code
from flask import Flask
from prefect import flow, get_run_logger

app = Flask(__name__)


@flow(name="valentin_flask")
def process_request():
    logger = get_run_logger()
    <http://logger.info|logger.info>("Hello from Flask - received  input: %s")


@app.route("/event", methods=["GET"])
def push_event():
    process_request()
    return {"ok": True}


if __name__ == "__main__":
    app.run(host="0.0.0.0", debug=True, port=8080)
But running a FastAPI app instead 😎
Copy code
"""
uvicorn valentin_fastapi:app
"""
from fastapi import FastAPI
from prefect import flow, get_run_logger

app = FastAPI(title="Valentin App")


@flow(name="valentin_flask")
def process_request():
    logger = get_run_logger()
    <http://logger.info|logger.info>("Hello from Flask - received  input: %s")


@app.get("/event")
def push_event():
    process_request()
    return {"ok": True}
works perfectly fine:
Copy code
INFO:     Started server process [34419]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     Uvicorn running on <http://127.0.0.1:8000> (Press CTRL+C to quit)
INFO:     127.0.0.1:53096 - "GET / HTTP/1.1" 404 Not Found
INFO:     127.0.0.1:53096 - "GET /favicon.ico HTTP/1.1" 404 Not Found
INFO:     127.0.0.1:53102 - "GET /docs HTTP/1.1" 200 OK
INFO:     127.0.0.1:53102 - "GET /openapi.json HTTP/1.1" 200 OK
14:53:08.146 | INFO    | prefect.engine - Created flow run 'qualified-herring' for flow 'valentin_flask'
14:53:08.147 | INFO    | Flow run 'qualified-herring' - Using task runner 'ConcurrentTaskRunner'
14:53:08.741 | INFO    | Flow run 'qualified-herring' - Hello from Flask - received  input: %s
14:53:09.664 | INFO    | Flow run 'qualified-herring' - Finished in state Completed()
switching from Flask to FastAPI could help here quite a lot when it comes to dependency management since Prefect 2.0 relies on FastAPI anyway and things could be easier to manage this way
v
ok thank you I take note of that and will be using FastAPI for now
👍 1
z
@Anna Geller can you open an issue with the Flask MRE? We’d like to have support for flask eventually.
a
b
Acknowledging that this issue is still open, I'm wondering if there are any end-to-end examples of a Prefect project running (e.g. using the built-in
docker-compose
stack) and then a separate "Hello World" flask app that invokes a job with GraphQL or some other API? How do folks normally do this end-to-end? 1. Register the flow via python CLI, results in a flow ID 2. <save the flow ID> a. How is it extracted from step 1? 3. Setup the Flask project to invoke Prefect, using <saved flow ID> Thanks in advance!
1
z
I’ll have a fix for this Flask issue open today
🙏 2
a
marking as since Michael answered both here and on the issue