https://prefect.io logo
Title
s

Shuchita Tripathi

03/31/2022, 1:58 PM
Hi. My scenario is to create a prefect flow and then run it using an API call. I am able to run any already created flow using POST calls (screenshot1 attached). When the POST call is invoked, the function prefect_flow is called. But I am not getting idea on how to create flows by POST calls. (screenshot2 attached for create flow code without any reference to API). I tried to encapsulate the whole task and flow inside one function (temp_prefect_run, line#9), and then calling that function for POST call, but I am getting internal server error. When checking the detailed logs: if I am doing f.register(project_name), the error says ->
requests.exceptions.ConnectionError: HTTPConnectionPool(host='localhost', port=4200): Max retries exceeded with url: / (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x000002A9EA91E100>: Failed to establish a new connection: [WinError 10061] No connection could be made because the target machine actively refused it'))
My prefect server is not at localhost. if I am doing client.register(flow, project_name), the error says ->
raise ValueError("This flow has no storage to build")
ValueError: This flow has no storage to build
What is the best way to run POST calls which can create a new flow?
k

Kevin Kho

03/31/2022, 2:00 PM
You need to point your local machine to the server endpoint. You can do it with the env var
PREFECT__SERVER__ENDPOINT=<your-endpoint>
or in the config.toml
[server]
endpoint=...
s

Shuchita Tripathi

03/31/2022, 2:00 PM
that I have done
because the run flow is working
if I create a flow in prefect and use the POST call to just run it, everything is working fine.
but when I am trying to create the flow itself using the POST call and the code shown above, I am getting above mentioned issues
k

Kevin Kho

03/31/2022, 2:03 PM
So if you purely want a post call, you have to work with the serialized JSON of the Flow and hit the endpoint. There’s a bunch of work
flow.register()
is doing so it’s not recommended to do it yourself.
flow.register()
or the CLI is recommended
s

Shuchita Tripathi

03/31/2022, 2:04 PM
ok, i'll try flow.register
k

Kevin Kho

03/31/2022, 2:04 PM
And then
flow.register()
will use that endpoint you configured
s

Shuchita Tripathi

03/31/2022, 2:05 PM
sorry, i already tried flow.register . line#21 in my code
this is my config.toml
k

Kevin Kho

03/31/2022, 2:06 PM
Is this
config.toml
on the same machine you are calling the register from?
s

Shuchita Tripathi

03/31/2022, 2:07 PM
yes
k

Kevin Kho

03/31/2022, 2:07 PM
Why is your backend cloud there? Not sure if related but I think it should be
server
s

Shuchita Tripathi

03/31/2022, 2:08 PM
i didn't do any change there, but i'll make that change and re-run prefect agent, my application and make the post call
k

Kevin Kho

03/31/2022, 2:09 PM
Yeah the default is cloud and you need to change to server, normally with
prefect backend server
s

Shuchita Tripathi

03/31/2022, 2:15 PM
I did that. Still receiving the same error
k

Kevin Kho

03/31/2022, 2:18 PM
What happens when you do
prefect agent local start
?
s

Shuchita Tripathi

03/31/2022, 2:19 PM
starting properly
Most of my runs here are run by POST calls from the code.
everything works fine when running the flow. creating the flow has issue.
k

Kevin Kho

03/31/2022, 2:23 PM
That actually looks good. I’m confused why the flow creation is failing. Can you give me a longer traceback?
s

Shuchita Tripathi

03/31/2022, 2:28 PM
INFO:     127.0.0.1:53927 - "POST /workflow/vmproject/tf1 HTTP/1.1" 500 Internal Server Error
ERROR:    Exception in ASGI application
Traceback (most recent call last):
  File "C:\Users\dummyvalue\AppData\Local\pypoetry\Cache\virtualenvs\automation-servicr-YfkC5kYT-py3.9\lib\site-packages\uvicorn\protocols\http\h11_impl.py", line 373, in run_asgi
    result = await app(self.scope, self.receive, self.send)
  File "C:\Users\dummyvalue\AppData\Local\pypoetry\Cache\virtualenvs\automation-servicr-YfkC5kYT-py3.9\lib\site-packages\uvicorn\middleware\proxy_headers.py", line 75, in __call__
    return await <http://self.app|self.app>(scope, receive, send)
  File "C:\Users\dummyvalue\AppData\Local\pypoetry\Cache\virtualenvs\automation-servicr-YfkC5kYT-py3.9\lib\site-packages\fastapi\applications.py", line 208, in __call__
    await super().__call__(scope, receive, send)
  File "C:\Users\dummyvalue\AppData\Local\pypoetry\Cache\virtualenvs\automation-servicr-YfkC5kYT-py3.9\lib\site-packages\starlette\applications.py", line 112, in __call__
    await self.middleware_stack(scope, receive, send)
  File "C:\Users\dummyvalue\AppData\Local\pypoetry\Cache\virtualenvs\automation-servicr-YfkC5kYT-py3.9\lib\site-packages\starlette\middleware\errors.py", line 181, in __call__
    raise exc from None
  File "C:\Users\dummyvalue\AppData\Local\pypoetry\Cache\virtualenvs\automation-servicr-YfkC5kYT-py3.9\lib\site-packages\starlette\middleware\errors.py", line 159, in __call__
    await <http://self.app|self.app>(scope, receive, _send)
  File "C:\Users\dummyvalue\AppData\Local\pypoetry\Cache\virtualenvs\automation-servicr-YfkC5kYT-py3.9\lib\site-packages\starlette\exceptions.py", line 82, in __call__
    raise exc from None
  File "C:\Users\dummyvalue\AppData\Local\pypoetry\Cache\virtualenvs\automation-servicr-YfkC5kYT-py3.9\lib\site-packages\starlette\exceptions.py", line 71, in __call__
    await <http://self.app|self.app>(scope, receive, sender)
  File "C:\Users\dummyvalue\AppData\Local\pypoetry\Cache\virtualenvs\automation-servicr-YfkC5kYT-py3.9\lib\site-packages\starlette\routing.py", line 580, in __call__
    await route.handle(scope, receive, send)
  File "C:\Users\dummyvalue\AppData\Local\pypoetry\Cache\virtualenvs\automation-servicr-YfkC5kYT-py3.9\lib\site-packages\starlette\routing.py", line 241, in handle
    await <http://self.app|self.app>(scope, receive, send)
  File "C:\Users\dummyvalue\AppData\Local\pypoetry\Cache\virtualenvs\automation-servicr-YfkC5kYT-py3.9\lib\site-packages\starlette\routing.py", line 52, in app
    response = await func(request)
File "C:\Users\dummyvalue\AppData\Local\pypoetry\Cache\virtualenvs\automation-servicr-YfkC5kYT-py3.9\lib\site-packages\fastapi\routing.py", line 159, in run_endpoint_function
    return await dependant.call(**values)
  File "C:\Users\dummyvalue\dummyvalue1\Documents\composr\orchestratr-api\.\app\workflow\wf_router.py", line 30, in prefect_run_flow
    test = temp_prefect_run(project_name, flow_name)
  File "C:\Users\dummyvalue\dummyvalue1\Documents\composr\orchestratr-api\.\app\tempdelete\prefectterraform.py", line 21, in temp_prefect_run
    f.register(project_name=project_name)
  File "C:\Users\dummyvalue\AppData\Local\pypoetry\Cache\virtualenvs\automation-servicr-YfkC5kYT-py3.9\lib\site-packages\prefect\core\flow.py", line 1708, in register
    registered_flow = client.register(
  File "C:\Users\dummyvalue\AppData\Local\pypoetry\Cache\virtualenvs\automation-servicr-YfkC5kYT-py3.9\lib\site-packages\prefect\client\client.py", line 839, in register
    project = self.graphql(query_project).data.project  # type: ignore
  File "C:\Users\dummyvalue\AppData\Local\pypoetry\Cache\virtualenvs\automation-servicr-YfkC5kYT-py3.9\lib\site-packages\prefect\client\client.py", line 452, in graphql
    result = <http://self.post|self.post>(
  File "C:\Users\dummyvalue\AppData\Local\pypoetry\Cache\virtualenvs\automation-servicr-YfkC5kYT-py3.9\lib\site-packages\prefect\client\client.py", line 407, in post
    response = self._request(
  File "C:\Users\dummyvalue\AppData\Local\pypoetry\Cache\virtualenvs\automation-servicr-YfkC5kYT-py3.9\lib\site-packages\prefect\client\client.py", line 641, in _request
    response = self._send_request(
  File "C:\Users\dummyvalue\AppData\Local\pypoetry\Cache\virtualenvs\automation-servicr-YfkC5kYT-py3.9\lib\site-packages\prefect\client\client.py", line 506, in _send_request
    response = <http://session.post|session.post>(
  File "C:\Users\dummyvalue\AppData\Local\pypoetry\Cache\virtualenvs\automation-servicr-YfkC5kYT-py3.9\lib\site-packages\requests\sessions.py", line 590, in post
    return self.request('POST', url, data=data, json=json, **kwargs)
  File "C:\Users\dummyvalue\AppData\Local\pypoetry\Cache\virtualenvs\automation-servicr-YfkC5kYT-py3.9\lib\site-packages\requests\sessions.py", line 542, in request
    resp = self.send(prep, **send_kwargs)
  File "C:\Users\dummyvalue\AppData\Local\pypoetry\Cache\virtualenvs\automation-servicr-YfkC5kYT-py3.9\lib\site-packages\requests\sessions.py", line 655, in send
    r = adapter.send(request, **kwargs)
  File "C:\Users\dummyvalue\AppData\Local\pypoetry\Cache\virtualenvs\automation-servicr-YfkC5kYT-py3.9\lib\site-packages\requests\adapters.py", line 516, in send
    raise ConnectionError(e, request=request)
requests.exceptions.ConnectionError: HTTPConnectionPool(host='localhost', port=4200): Max retries exceeded with url: / (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x000001DDFB8917F0>: Failed to establish a new connection: [WinError 10061] No connection could be made because the target machine actively refused it'))
is this ok? OR can I provide something else
k

Kevin Kho

03/31/2022, 2:30 PM
I think this is fine. Pretty confused where it’s picking this up
Especially cuz you registered flows successsfully previous right?
Wait no in the config.toml on the machine with server or your local machine trying to hit the API?
You need to point the local machine to the correct GraphQL endpoint
Was that agent you started on the same machine as the server or the local machine from where you are trying to register the Flow./
s

Shuchita Tripathi

03/31/2022, 2:42 PM
config.toml, prefect agent, and API are on my local machine. Prefect is installed in a VM in cloud
k

Kevin Kho

03/31/2022, 2:46 PM
That is very confusing why it’s not working. You have this line where you create the client. Could you try removing that and try registering a flow?
s

Shuchita Tripathi

03/31/2022, 2:57 PM
tried this. still the same issue
there is some issue when it is calling run_terraform function. I tried to return a test string after the with statement and it works. but if I am returning the test string from run_terraform function(line#12), it is not working
k

Kevin Kho

03/31/2022, 3:04 PM
pretty confused. can you try exporting the env variable instead?
PREFECT__SERVER__ENDPOINT="YOUR_MACHINES_PUBLIC_IP:4200/graphql"
what do you mean by it works? You can register a Flow? I don’t think the return does anything in the Flow block?
s

Shuchita Tripathi

03/31/2022, 3:06 PM
sorry, it works means the post call is returning the value "Line#20". which means the problem is coming up when calling run_terraform function
k

Kevin Kho

03/31/2022, 3:07 PM
I don’t think so, but that’s a good thought. Maybe we can try with a simpler script first?
@task
def abc():
    return "hello"

with Flow(flow_name) as flow:
    abc()

flow.register(project_name)
and see if it works
s

Shuchita Tripathi

03/31/2022, 3:10 PM
not working
k

Kevin Kho

03/31/2022, 3:11 PM
Did you try the env var yet?
How did you get the previous Flows registered?
s

Shuchita Tripathi

03/31/2022, 3:13 PM
tried it this way. getting the same error
earlier I did not make post calls. i ran the code directly from my cmd using
python <filename.py>
k

Kevin Kho

03/31/2022, 3:15 PM
Wait, why can’t you just do that to register flows? Why the need for a post call?
a

Anna Geller

03/31/2022, 3:15 PM
I'm trying to catch up here, but I think this is a bit too unstructured to follow. @Shuchita Tripathi could you take a step back and try doing the following? 1. Go through this documentation to deploy your Prefect Server 2. Once your Server is up, start a simple agent e.g. a local agent 3. Run a simple hello world flow to confirm this is working 4. Move step-by-step to the setup you wish to get e.g. if you prefer to use a different agent than a local agent, do that first 5. Then add your preferred storage mechanism, etc
Basically, before running any complex use cases like triggering flows via API calls, it would be easier to first verify that all your Server components and your orchestration layer (your agent) as well as your storage are working correctly
s

Shuchita Tripathi

03/31/2022, 3:17 PM
our requirement is to make a post call which creates and then runs the flows.
Anna - I have done that. all my prefect flows are working. the only problem is with create flows
k

Kevin Kho

03/31/2022, 3:19 PM
That POST call seems to be affecting the endpoint Prefect is hitting. We don’t recommend creating flows outside the
flow.register()
and
prefect register …
CLI, because these commands do a lot of formatting and handle a lot of logic for you
a

Anna Geller

03/31/2022, 3:19 PM
can you share your working flow? We could then trigger the same flow from your local terminal using:
prefect run --name yourflowname --project yourproject --watch
if this command works, we could move to triggering the same flow via an API call
s

Shuchita Tripathi

03/31/2022, 3:19 PM
we are in process of creating an automation, which will use terraform to build some resources in a cloud. for this, we have plans to make POST calls which can create and run flows. running already existing flow is working fine through that POST call. creating a new flow is not working with POST.
this command is also giving error
k

Kevin Kho

03/31/2022, 3:22 PM
That looks weird because it’s hitting port 80?
s

Shuchita Tripathi

03/31/2022, 3:23 PM
that's what i thought
k

Kevin Kho

03/31/2022, 3:25 PM
The client.register() method has too much logic to bring to handle the POST call yourself
s

Shuchita Tripathi

03/31/2022, 3:26 PM
just to show that existing flow is successfully running using POST call
k

Kevin Kho

03/31/2022, 3:29 PM
The flow run creation is significantly easier because the endpoint can just take in a Flow id, but for the creation, the Python flow is also massaged a lot to get in the right form
Still really confused about why the wrong endpoint is being hit, but it’s hard to know without diving deep into the setup. From our end though we really recommend just using the
flow.register()
and
prefect register
commands
s

Shuchita Tripathi

03/31/2022, 3:31 PM
ok. thank you. I'll see how I can modify it to make it work
This is the output of the above command
prefect run --name "terraform flow" --project vmproject --watch
k

Kevin Kho

03/31/2022, 5:42 PM
That looks successful?
s

Shuchita Tripathi

03/31/2022, 5:45 PM
yes. but this is not a POST call
POST call is still not happening. i tried with client.register(build=false) and added a storage for flow -> this created the flow. but it is not running
the agent is not picking it up
k

Kevin Kho

03/31/2022, 5:48 PM
This is an example for the
create_flow_run
post
s

Shuchita Tripathi

03/31/2022, 5:49 PM
create flow run is not the problem. it is working fine if the flow is created without build=False.
k

Kevin Kho

03/31/2022, 5:53 PM
A bit confused. So you create a Flow run that was registered but it’s not running? That sounds like an issue with labels? What are the labels for your Flow and agent?
s

Shuchita Tripathi

03/31/2022, 5:55 PM
i'll re-iterate the whole problem: My usecase is to make POST call to create flow and then run them. POST call is successful is I am running an existing flow. however creating a new flow is not working with POST. it is working when I am running it as a normal python code (python <filename.py>) Now, I tried multiple way to make flow creation work with POST. when I am running client.register(flow, project, build=false), the flow gets created, but then it does not run. it just gets stuck on waiting. has build=false anything to do with it?
k

Kevin Kho

03/31/2022, 5:58 PM
Yeah it sounds like a label mismatch between the agent and the created flow. Are you familiar with labels ?
Agents can only pick up flows with the same labels
s

Shuchita Tripathi

03/31/2022, 5:59 PM
not very familiar. i am going through the documentation now
after a lot of hit and trial, things are working now. I am posting my solution here, but it can still get a lot better. Since flow.register was only taking localhost:4200, not the IP from config.toml, i used client.register() to create flows. To use client.register, I had to configure storage. So i used storage=Local(). So now the flow was getting created, but it wasn't getting picked up by agent. So I added run_config=LocalRun(labels=['<label_name_used with agent>']). now everything is working fine. Here is my code snippet.