Hi. My scenario is to create a prefect flow and th...
# prefect-community
s
Hi. My scenario is to create a prefect flow and then run it using an API call. I am able to run any already created flow using POST calls (screenshot1 attached). When the POST call is invoked, the function prefect_flow is called. But I am not getting idea on how to create flows by POST calls. (screenshot2 attached for create flow code without any reference to API). I tried to encapsulate the whole task and flow inside one function (temp_prefect_run, line#9), and then calling that function for POST call, but I am getting internal server error. When checking the detailed logs: if I am doing f.register(project_name), the error says ->
Copy code
requests.exceptions.ConnectionError: HTTPConnectionPool(host='localhost', port=4200): Max retries exceeded with url: / (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x000002A9EA91E100>: Failed to establish a new connection: [WinError 10061] No connection could be made because the target machine actively refused it'))
My prefect server is not at localhost. if I am doing client.register(flow, project_name), the error says ->
Copy code
raise ValueError("This flow has no storage to build")
ValueError: This flow has no storage to build
What is the best way to run POST calls which can create a new flow?
k
You need to point your local machine to the server endpoint. You can do it with the env var
Copy code
PREFECT__SERVER__ENDPOINT=<your-endpoint>
or in the config.toml
Copy code
[server]
endpoint=...
s
that I have done
because the run flow is working
if I create a flow in prefect and use the POST call to just run it, everything is working fine.
but when I am trying to create the flow itself using the POST call and the code shown above, I am getting above mentioned issues
k
So if you purely want a post call, you have to work with the serialized JSON of the Flow and hit the endpoint. There’s a bunch of work
flow.register()
is doing so it’s not recommended to do it yourself.
flow.register()
or the CLI is recommended
s
ok, i'll try flow.register
k
And then
flow.register()
will use that endpoint you configured
s
sorry, i already tried flow.register . line#21 in my code
this is my config.toml
k
Is this
config.toml
on the same machine you are calling the register from?
s
yes
k
Why is your backend cloud there? Not sure if related but I think it should be
server
s
i didn't do any change there, but i'll make that change and re-run prefect agent, my application and make the post call
k
Yeah the default is cloud and you need to change to server, normally with
prefect backend server
s
I did that. Still receiving the same error
k
What happens when you do
prefect agent local start
?
s
starting properly
Most of my runs here are run by POST calls from the code.
everything works fine when running the flow. creating the flow has issue.
k
That actually looks good. I’m confused why the flow creation is failing. Can you give me a longer traceback?
s
Copy code
INFO:     127.0.0.1:53927 - "POST /workflow/vmproject/tf1 HTTP/1.1" 500 Internal Server Error
ERROR:    Exception in ASGI application
Traceback (most recent call last):
  File "C:\Users\dummyvalue\AppData\Local\pypoetry\Cache\virtualenvs\automation-servicr-YfkC5kYT-py3.9\lib\site-packages\uvicorn\protocols\http\h11_impl.py", line 373, in run_asgi
    result = await app(self.scope, self.receive, self.send)
  File "C:\Users\dummyvalue\AppData\Local\pypoetry\Cache\virtualenvs\automation-servicr-YfkC5kYT-py3.9\lib\site-packages\uvicorn\middleware\proxy_headers.py", line 75, in __call__
    return await <http://self.app|self.app>(scope, receive, send)
  File "C:\Users\dummyvalue\AppData\Local\pypoetry\Cache\virtualenvs\automation-servicr-YfkC5kYT-py3.9\lib\site-packages\fastapi\applications.py", line 208, in __call__
    await super().__call__(scope, receive, send)
  File "C:\Users\dummyvalue\AppData\Local\pypoetry\Cache\virtualenvs\automation-servicr-YfkC5kYT-py3.9\lib\site-packages\starlette\applications.py", line 112, in __call__
    await self.middleware_stack(scope, receive, send)
  File "C:\Users\dummyvalue\AppData\Local\pypoetry\Cache\virtualenvs\automation-servicr-YfkC5kYT-py3.9\lib\site-packages\starlette\middleware\errors.py", line 181, in __call__
    raise exc from None
  File "C:\Users\dummyvalue\AppData\Local\pypoetry\Cache\virtualenvs\automation-servicr-YfkC5kYT-py3.9\lib\site-packages\starlette\middleware\errors.py", line 159, in __call__
    await <http://self.app|self.app>(scope, receive, _send)
  File "C:\Users\dummyvalue\AppData\Local\pypoetry\Cache\virtualenvs\automation-servicr-YfkC5kYT-py3.9\lib\site-packages\starlette\exceptions.py", line 82, in __call__
    raise exc from None
  File "C:\Users\dummyvalue\AppData\Local\pypoetry\Cache\virtualenvs\automation-servicr-YfkC5kYT-py3.9\lib\site-packages\starlette\exceptions.py", line 71, in __call__
    await <http://self.app|self.app>(scope, receive, sender)
  File "C:\Users\dummyvalue\AppData\Local\pypoetry\Cache\virtualenvs\automation-servicr-YfkC5kYT-py3.9\lib\site-packages\starlette\routing.py", line 580, in __call__
    await route.handle(scope, receive, send)
  File "C:\Users\dummyvalue\AppData\Local\pypoetry\Cache\virtualenvs\automation-servicr-YfkC5kYT-py3.9\lib\site-packages\starlette\routing.py", line 241, in handle
    await <http://self.app|self.app>(scope, receive, send)
  File "C:\Users\dummyvalue\AppData\Local\pypoetry\Cache\virtualenvs\automation-servicr-YfkC5kYT-py3.9\lib\site-packages\starlette\routing.py", line 52, in app
    response = await func(request)
File "C:\Users\dummyvalue\AppData\Local\pypoetry\Cache\virtualenvs\automation-servicr-YfkC5kYT-py3.9\lib\site-packages\fastapi\routing.py", line 159, in run_endpoint_function
    return await dependant.call(**values)
  File "C:\Users\dummyvalue\dummyvalue1\Documents\composr\orchestratr-api\.\app\workflow\wf_router.py", line 30, in prefect_run_flow
    test = temp_prefect_run(project_name, flow_name)
  File "C:\Users\dummyvalue\dummyvalue1\Documents\composr\orchestratr-api\.\app\tempdelete\prefectterraform.py", line 21, in temp_prefect_run
    f.register(project_name=project_name)
  File "C:\Users\dummyvalue\AppData\Local\pypoetry\Cache\virtualenvs\automation-servicr-YfkC5kYT-py3.9\lib\site-packages\prefect\core\flow.py", line 1708, in register
    registered_flow = client.register(
  File "C:\Users\dummyvalue\AppData\Local\pypoetry\Cache\virtualenvs\automation-servicr-YfkC5kYT-py3.9\lib\site-packages\prefect\client\client.py", line 839, in register
    project = self.graphql(query_project).data.project  # type: ignore
  File "C:\Users\dummyvalue\AppData\Local\pypoetry\Cache\virtualenvs\automation-servicr-YfkC5kYT-py3.9\lib\site-packages\prefect\client\client.py", line 452, in graphql
    result = <http://self.post|self.post>(
  File "C:\Users\dummyvalue\AppData\Local\pypoetry\Cache\virtualenvs\automation-servicr-YfkC5kYT-py3.9\lib\site-packages\prefect\client\client.py", line 407, in post
    response = self._request(
  File "C:\Users\dummyvalue\AppData\Local\pypoetry\Cache\virtualenvs\automation-servicr-YfkC5kYT-py3.9\lib\site-packages\prefect\client\client.py", line 641, in _request
    response = self._send_request(
  File "C:\Users\dummyvalue\AppData\Local\pypoetry\Cache\virtualenvs\automation-servicr-YfkC5kYT-py3.9\lib\site-packages\prefect\client\client.py", line 506, in _send_request
    response = <http://session.post|session.post>(
  File "C:\Users\dummyvalue\AppData\Local\pypoetry\Cache\virtualenvs\automation-servicr-YfkC5kYT-py3.9\lib\site-packages\requests\sessions.py", line 590, in post
    return self.request('POST', url, data=data, json=json, **kwargs)
  File "C:\Users\dummyvalue\AppData\Local\pypoetry\Cache\virtualenvs\automation-servicr-YfkC5kYT-py3.9\lib\site-packages\requests\sessions.py", line 542, in request
    resp = self.send(prep, **send_kwargs)
  File "C:\Users\dummyvalue\AppData\Local\pypoetry\Cache\virtualenvs\automation-servicr-YfkC5kYT-py3.9\lib\site-packages\requests\sessions.py", line 655, in send
    r = adapter.send(request, **kwargs)
  File "C:\Users\dummyvalue\AppData\Local\pypoetry\Cache\virtualenvs\automation-servicr-YfkC5kYT-py3.9\lib\site-packages\requests\adapters.py", line 516, in send
    raise ConnectionError(e, request=request)
requests.exceptions.ConnectionError: HTTPConnectionPool(host='localhost', port=4200): Max retries exceeded with url: / (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x000001DDFB8917F0>: Failed to establish a new connection: [WinError 10061] No connection could be made because the target machine actively refused it'))
is this ok? OR can I provide something else
k
I think this is fine. Pretty confused where it’s picking this up
Especially cuz you registered flows successsfully previous right?
Wait no in the config.toml on the machine with server or your local machine trying to hit the API?
You need to point the local machine to the correct GraphQL endpoint
Was that agent you started on the same machine as the server or the local machine from where you are trying to register the Flow./
s
config.toml, prefect agent, and API are on my local machine. Prefect is installed in a VM in cloud
k
That is very confusing why it’s not working. You have this line where you create the client. Could you try removing that and try registering a flow?
s
tried this. still the same issue
there is some issue when it is calling run_terraform function. I tried to return a test string after the with statement and it works. but if I am returning the test string from run_terraform function(line#12), it is not working
k
pretty confused. can you try exporting the env variable instead?
Copy code
PREFECT__SERVER__ENDPOINT="YOUR_MACHINES_PUBLIC_IP:4200/graphql"
what do you mean by it works? You can register a Flow? I don’t think the return does anything in the Flow block?
s
sorry, it works means the post call is returning the value "Line#20". which means the problem is coming up when calling run_terraform function
k
I don’t think so, but that’s a good thought. Maybe we can try with a simpler script first?
Copy code
@task
def abc():
    return "hello"

with Flow(flow_name) as flow:
    abc()

flow.register(project_name)
and see if it works
s
not working
k
Did you try the env var yet?
How did you get the previous Flows registered?
s
tried it this way. getting the same error
earlier I did not make post calls. i ran the code directly from my cmd using
Copy code
python <filename.py>
k
Wait, why can’t you just do that to register flows? Why the need for a post call?
a
I'm trying to catch up here, but I think this is a bit too unstructured to follow. @Shuchita Tripathi could you take a step back and try doing the following? 1. Go through this documentation to deploy your Prefect Server 2. Once your Server is up, start a simple agent e.g. a local agent 3. Run a simple hello world flow to confirm this is working 4. Move step-by-step to the setup you wish to get e.g. if you prefer to use a different agent than a local agent, do that first 5. Then add your preferred storage mechanism, etc
Basically, before running any complex use cases like triggering flows via API calls, it would be easier to first verify that all your Server components and your orchestration layer (your agent) as well as your storage are working correctly
s
our requirement is to make a post call which creates and then runs the flows.
Anna - I have done that. all my prefect flows are working. the only problem is with create flows
k
That POST call seems to be affecting the endpoint Prefect is hitting. We don’t recommend creating flows outside the
flow.register()
and
prefect register …
CLI, because these commands do a lot of formatting and handle a lot of logic for you
a
can you share your working flow? We could then trigger the same flow from your local terminal using:
Copy code
prefect run --name yourflowname --project yourproject --watch
if this command works, we could move to triggering the same flow via an API call
s
we are in process of creating an automation, which will use terraform to build some resources in a cloud. for this, we have plans to make POST calls which can create and run flows. running already existing flow is working fine through that POST call. creating a new flow is not working with POST.
this command is also giving error
k
That looks weird because it’s hitting port 80?
s
that's what i thought
k
The client.register() method has too much logic to bring to handle the POST call yourself
s
just to show that existing flow is successfully running using POST call
k
The flow run creation is significantly easier because the endpoint can just take in a Flow id, but for the creation, the Python flow is also massaged a lot to get in the right form
Still really confused about why the wrong endpoint is being hit, but it’s hard to know without diving deep into the setup. From our end though we really recommend just using the
flow.register()
and
prefect register
commands
s
ok. thank you. I'll see how I can modify it to make it work
This is the output of the above command
Copy code
prefect run --name "terraform flow" --project vmproject --watch
k
That looks successful?
s
yes. but this is not a POST call
POST call is still not happening. i tried with client.register(build=false) and added a storage for flow -> this created the flow. but it is not running
the agent is not picking it up
k
This is an example for the
create_flow_run
post
s
create flow run is not the problem. it is working fine if the flow is created without build=False.
k
A bit confused. So you create a Flow run that was registered but it’s not running? That sounds like an issue with labels? What are the labels for your Flow and agent?
s
i'll re-iterate the whole problem: My usecase is to make POST call to create flow and then run them. POST call is successful is I am running an existing flow. however creating a new flow is not working with POST. it is working when I am running it as a normal python code (python <filename.py>) Now, I tried multiple way to make flow creation work with POST. when I am running client.register(flow, project, build=false), the flow gets created, but then it does not run. it just gets stuck on waiting. has build=false anything to do with it?
k
Yeah it sounds like a label mismatch between the agent and the created flow. Are you familiar with labels ?
Agents can only pick up flows with the same labels
s
not very familiar. i am going through the documentation now
after a lot of hit and trial, things are working now. I am posting my solution here, but it can still get a lot better. Since flow.register was only taking localhost:4200, not the IP from config.toml, i used client.register() to create flows. To use client.register, I had to configure storage. So i used storage=Local(). So now the flow was getting created, but it wasn't getting picked up by agent. So I added run_config=LocalRun(labels=['<label_name_used with agent>']). now everything is working fine. Here is my code snippet.