Hello folks, I'm trying to register my flow with U...
# prefect-community
q
Hello folks, I'm trying to register my flow with UI, I ran the
docker-compose
and
docker
files added these lines:
Copy code
flow.run_agent()
c = Client()
c.create_flow_run()
but facing this error:
n
Hi @Questionnaire - any reason you're running
docker-compose
directly and not using the
prefect server start
command?
q
actually I added few of my components in it also like pg-admin and mongo
or can you guide me through this process...
Like, I created my own project added simple script having tasks and flow
n
Got it - it looks like you're missing the
flow_id
from that
create_flow_run
method
q
when I was running running
prefect server start
it gives error:
Copy code
ERROR: Couldn't connect to Docker daemon at <http+docker://localhost> - is it running?
If it's at a non-standard location, specify the URL with the DOCKER_HOST environment variable.
Exception caught; killing services (press ctrl-C to force)
ERROR: Couldn't connect to Docker daemon at <http+docker://localhost> - is it running?

If it's at a non-standard location, specify the URL with the DOCKER_HOST environment variable.
Traceback (most recent call last):
  File "/home/tanmay/.local/lib/python3.6/site-packages/prefect/cli/server.py", line 302, in start
    ["docker-compose", "pull"], cwd=compose_dir_path, env=env
  File "/usr/lib/python3.6/subprocess.py", line 311, in check_call
    raise CalledProcessError(retcode, cmd)
subprocess.CalledProcessError: Command '['docker-compose', 'pull']' returned non-zero exit status 1.

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/tanmay/.local/bin/prefect", line 8, in <module>
    sys.exit(cli())
  File "/home/tanmay/.local/lib/python3.6/site-packages/click/core.py", line 829, in __call__
    return self.main(*args, **kwargs)
  File "/home/tanmay/.local/lib/python3.6/site-packages/click/core.py", line 782, in main
    rv = self.invoke(ctx)
  File "/home/tanmay/.local/lib/python3.6/site-packages/click/core.py", line 1259, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/home/tanmay/.local/lib/python3.6/site-packages/click/core.py", line 1259, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/home/tanmay/.local/lib/python3.6/site-packages/click/core.py", line 1066, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/home/tanmay/.local/lib/python3.6/site-packages/click/core.py", line 610, in invoke
    return callback(*args, **kwargs)
  File "/home/tanmay/.local/lib/python3.6/site-packages/prefect/cli/server.py", line 318, in start
    ["docker-compose", "down"], cwd=compose_dir_path, env=env
  File "/usr/lib/python3.6/subprocess.py", line 356, in check_output
    **kwargs).stdout
  File "/usr/lib/python3.6/subprocess.py", line 438, in run
    output=stdout, stderr=stderr)
subprocess.CalledProcessError: Command '['docker-compose', 'down']' returned non-zero exit status 1.
n
In that case it looks like either docker isn't running on the machine, or the docker daemon isn't accessible because of some permissions issue. You'll want to take a look at the docker docs for how to set up docker permissions.
q
Yeah
Now,
prefect server start
is working fine, but can you tell me how to pass my postgres credentials in it?
n
Are you wanting to use your own persistent db outside of the docker network?
q
Yes
n
Let me check what the best way to do that is
q
sure
n
Alright so it looks like we don't have a great way to natively support this with the CLI command so you'll need to run the
docker-compose
file manually. If you're using the one that comes with Prefect Server, you can run this command:
DB_CONNECTION_URL=your_db_endpoint docker-compose -f src/prefect/cli/docker-compose.yml up -d
q
Okay, so it's better for me to make my own
So, let's get back to the problem I have earlier
n
Yes, did you try providing a
flow_id
to the
create_flow_run
method?
q
how to get the flow_id ?
n
You can retrieve the
flow_id
after you've registered the flow using
flow.register()
, but it looks like that step is failing, according to your main error message. Can you share the contents of
main.py
?
q
Ahhh, I can't share but can you tell me the expected errors?
like until I was running it using flow.run() only it is working fine
c
@Questionnaire it looks like you are instantiating non-serializable objects outside of your task definitions. E.g., if you open a Client connection or a database connection outside of one of your tasks, Prefect has no way of “saving” that connection for re-use in a new process
q
Yeah, I'm making mongo connection and getting all secrets at the top of my script... outside of flow.... So, what should I do?
@Chris White
c
You should open connections within your Task’s runtime logic, not outside of your Flow’s logic
q
Okay
Will this work? If I write these lines in Flow and pass
con
in task
Copy code
pg_db = PrefectSecret("PG_DB")
    pg_username = PrefectSecret("PG_USERNAME")
    pg_password = PrefectSecret("PG_PASS")
    pg_hostname = PrefectSecret("PG_HOST")
    con = PostgresFetch(pg_db, pg_username, pg_password, pg_hostname, port=5432)
c
if
PostgresFetch
is a task then yup that’ll work!
(as long as you aren’t running on Dask or trying to checkpoint your task data)
q
I'm trying to make the postgres connection inside task but it's giving
Some Error Occured in Fetching Data from PG Database
n
@Questionnaire can you provide some code for us to help debug?
q
Sure
Copy code
from prefect import Flow
from prefect.tasks.postgres.postgres import PostgresFetch
from prefect.tasks.secrets import PrefectSecret


@task(max_retries=5, retry_delay=timedelta(seconds=10))
def extract():
    try:
        pg_db = PrefectSecret("PG_DB")
        pg_username = PrefectSecret("PG_USERNAME")
        pg_password = PrefectSecret("PG_PASS")
        pg_hostname = PrefectSecret("PG_HOST")
        con = PostgresFetch(pg_db, pg_username, pg_password, pg_hostname, port=5432)
        records = con.run(fetch='many', query='SELECT * FROM public."TableName"')
        print(records)
    except:
        print('Some Error Occured in Fetching Data from PG Database')


with Flow('TEST-ETL') as flow:
    extract()

flow.run()
n
@Questionnaire how are you defining those `PrefectSecret`s?
c
You’re mixing abstractions in this code block; it appears you are initializing Secret tasks within another task and feeding those as initialization kwargs; if you want to use
PrefectSecret
tasks those should be represented at the Flow-level, otherwise you should either call
run
on each of those tasks within your
extract
task or use the lower-level Prefect Secrets API
upvote 1
q
pg_db = PrefectSecret("PG_DB").run()
like this?
👍 1
c
yup that should do the trick
q
Now, everything is fixed but getting this error:
Copy code
Result check: OK
Traceback (most recent call last):
  File "/home/tanmay/.local/lib/python3.6/site-packages/urllib3/connectionpool.py", line 677, in urlopen
    chunked=chunked,
  File "/home/tanmay/.local/lib/python3.6/site-packages/urllib3/connectionpool.py", line 426, in _make_request
    six.raise_from(e, None)
  File "<string>", line 3, in raise_from
  File "/home/tanmay/.local/lib/python3.6/site-packages/urllib3/connectionpool.py", line 421, in _make_request
    httplib_response = conn.getresponse()
  File "/usr/lib/python3.6/http/client.py", line 1356, in getresponse
    response.begin()
  File "/usr/lib/python3.6/http/client.py", line 307, in begin
    version, status, reason = self._read_status()
  File "/usr/lib/python3.6/http/client.py", line 268, in _read_status
    line = str(self.fp.readline(_MAXLINE + 1), "iso-8859-1")
  File "/usr/lib/python3.6/socket.py", line 586, in readinto
    return self._sock.recv_into(b)
ConnectionResetError: [Errno 104] Connection reset by peer

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/tanmay/.local/lib/python3.6/site-packages/requests/adapters.py", line 449, in send
    timeout=timeout
  File "/home/tanmay/.local/lib/python3.6/site-packages/urllib3/connectionpool.py", line 725, in urlopen
    method, url, error=e, _pool=self, _stacktrace=sys.exc_info()[2]
  File "/home/tanmay/.local/lib/python3.6/site-packages/urllib3/util/retry.py", line 403, in increment
    raise six.reraise(type(error), error, _stacktrace)
  File "/home/tanmay/.local/lib/python3.6/site-packages/urllib3/packages/six.py", line 734, in reraise
    raise value.with_traceback(tb)
  File "/home/tanmay/.local/lib/python3.6/site-packages/urllib3/connectionpool.py", line 677, in urlopen
    chunked=chunked,
  File "/home/tanmay/.local/lib/python3.6/site-packages/urllib3/connectionpool.py", line 426, in _make_request
    six.raise_from(e, None)
  File "<string>", line 3, in raise_from
  File "/home/tanmay/.local/lib/python3.6/site-packages/urllib3/connectionpool.py", line 421, in _make_request
    httplib_response = conn.getresponse()
  File "/usr/lib/python3.6/http/client.py", line 1356, in getresponse
    response.begin()
  File "/usr/lib/python3.6/http/client.py", line 307, in begin
    version, status, reason = self._read_status()
  File "/usr/lib/python3.6/http/client.py", line 268, in _read_status
    line = str(self.fp.readline(_MAXLINE + 1), "iso-8859-1")
  File "/usr/lib/python3.6/socket.py", line 586, in readinto
    return self._sock.recv_into(b)
urllib3.exceptions.ProtocolError: ('Connection aborted.', ConnectionResetError(104, 'Connection reset by peer'))

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "main.py", line 307, in <module>
    flow.register()
  File "/home/tanmay/.local/lib/python3.6/site-packages/prefect/core/flow.py", line 1429, in register
    no_url=no_url,
  File "/home/tanmay/.local/lib/python3.6/site-packages/prefect/client/client.py", line 640, in register
    version_group_id=version_group_id,
  File "/home/tanmay/.local/lib/python3.6/site-packages/prefect/client/client.py", line 218, in graphql
    token=token,
  File "/home/tanmay/.local/lib/python3.6/site-packages/prefect/client/client.py", line 178, in post
    token=token,
  File "/home/tanmay/.local/lib/python3.6/site-packages/prefect/client/client.py", line 304, in _request
    response = <http://session.post|session.post>(url, headers=headers, json=params, timeout=30)
  File "/home/tanmay/.local/lib/python3.6/site-packages/requests/sessions.py", line 578, in post
    return self.request('POST', url, data=data, json=json, **kwargs)
  File "/home/tanmay/.local/lib/python3.6/site-packages/requests/sessions.py", line 530, in request
    resp = self.send(prep, **send_kwargs)
  File "/home/tanmay/.local/lib/python3.6/site-packages/requests/sessions.py", line 643, in send
    r = adapter.send(request, **kwargs)
  File "/home/tanmay/.local/lib/python3.6/site-packages/requests/adapters.py", line 498, in send
    raise ConnectionError(err, request=request)
requests.exceptions.ConnectionError: ('Connection aborted.', ConnectionResetError(104, 'Connection reset by peer'))
while registering flow and run_agent, Please tell me how to get the flow_id to pass in client.
n
Hey @Questionnaire that's a pretty generic error thrown by Postgres, usually due to errors with credentials, have you confirmed those are all correct?
q
But it was working when I did flow.run() and gives error when I try registering the flow
c
flow.run
runs everything in process without connecting to any external APIs or backends; registering a flow prepares it for deployment by talking to a Prefect API / backend
q
But if the credentials was wrong then how it's running the query and fetching the data
n
Sorry @Questionnaire, I think you modified your comment earlier so I hadn't seen when that was happening
q
Oh, I'm sorry for that @nicholas
👍 1
n
@Questionnaire where is that script running? In a container on the docker network?
q
@nicholas Running it Locally on my system using python3 script.py
n
Got it -
flow.register()
will try to communicate with either Prefect Cloud or Prefect Server; in order for it to communicate with your deployment of Prefect Server, a few things will need to be true: 1. Your local machine will need to be able to access your deployment of Prefect Server 2. Port 4200 will need to be exposed on the VM you've deployed Prefect Server 3. Your local Prefect will need to know where to send requests (wherever you've deployed Prefect Server) through the server endpoint, which you can set in
~/.prefect/config.toml
as such:
Copy code
[server]
endpoint = "YOUR_MACHINES_PUBLIC_IP:4200/graphql"
4. You'll need to tell Prefect to communicate with a Prefect Server installation by calling
prefect backend server
You may want to try getting a local version of Prefect Server working before diving into a complicated setup like this; I think that'll help you learn some of the Prefect concepts before having to get into these networking issues
q
Yeah, due to some custom need I'm running UI through my custom
docker-compose
n
Can you expand a bit on the custom need?
q
Like I want mongo, mongo admin and pgadmin for development. So, instead of using
prefect flow register
, I'm running my .yaml file @nicholas
c
@Questionnaire have you tried signing up for a free Cloud account? https://cloud.prefect.io/ — I think it would remove the complexity of getting a Prefect backend configured
q
Ok let me check
@nicholas I added the
[server]
and getting the error:
Copy code
Result check: OK
Traceback (most recent call last):
  File "main.py", line 308, in <module>
    flow.register()
  File "/home/tanmay/.local/lib/python3.6/site-packages/prefect/core/flow.py", line 1429, in register
    no_url=no_url,
  File "/home/tanmay/.local/lib/python3.6/site-packages/prefect/client/client.py", line 640, in register
    version_group_id=version_group_id,
  File "/home/tanmay/.local/lib/python3.6/site-packages/prefect/client/client.py", line 218, in graphql
    token=token,
  File "/home/tanmay/.local/lib/python3.6/site-packages/prefect/client/client.py", line 178, in post
    token=token,
  File "/home/tanmay/.local/lib/python3.6/site-packages/prefect/client/client.py", line 304, in _request
    response = <http://session.post|session.post>(url, headers=headers, json=params, timeout=30)
  File "/home/tanmay/.local/lib/python3.6/site-packages/requests/sessions.py", line 578, in post
    return self.request('POST', url, data=data, json=json, **kwargs)
  File "/home/tanmay/.local/lib/python3.6/site-packages/requests/sessions.py", line 530, in request
    resp = self.send(prep, **send_kwargs)
  File "/home/tanmay/.local/lib/python3.6/site-packages/requests/sessions.py", line 637, in send
    adapter = self.get_adapter(url=request.url)
  File "/home/tanmay/.local/lib/python3.6/site-packages/requests/sessions.py", line 728, in get_adapter
    raise InvalidSchema("No connection adapters were found for {!r}".format(url))
requests.exceptions.InvalidSchema: No connection adapters were found for '192.168.42.92:4200/graphql/graphql/alpha'
n
@Questionnaire you're missing the hyperlink protocol in that connection string, e.g.
http
Also did you call
prefect backend server
?
q
Yes @nicholas, I did and now getting this error:
Copy code
Result check: OK
Traceback (most recent call last):
  File "/home/tanmay/.local/lib/python3.6/site-packages/urllib3/connection.py", line 160, in _new_conn
    (self._dns_host, self.port), self.timeout, **extra_kw
  File "/home/tanmay/.local/lib/python3.6/site-packages/urllib3/util/connection.py", line 84, in create_connection
    raise err
  File "/home/tanmay/.local/lib/python3.6/site-packages/urllib3/util/connection.py", line 74, in create_connection
    sock.connect(sa)
ConnectionRefusedError: [Errno 111] Connection refused

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/tanmay/.local/lib/python3.6/site-packages/urllib3/connectionpool.py", line 677, in urlopen
    chunked=chunked,
  File "/home/tanmay/.local/lib/python3.6/site-packages/urllib3/connectionpool.py", line 392, in _make_request
    conn.request(method, url, **httplib_request_kw)
  File "/usr/lib/python3.6/http/client.py", line 1264, in request
    self._send_request(method, url, body, headers, encode_chunked)
  File "/usr/lib/python3.6/http/client.py", line 1310, in _send_request
    self.endheaders(body, encode_chunked=encode_chunked)
  File "/usr/lib/python3.6/http/client.py", line 1259, in endheaders
    self._send_output(message_body, encode_chunked=encode_chunked)
  File "/usr/lib/python3.6/http/client.py", line 1038, in _send_output
    self.send(msg)
  File "/usr/lib/python3.6/http/client.py", line 976, in send
    self.connect()
  File "/home/tanmay/.local/lib/python3.6/site-packages/urllib3/connection.py", line 187, in connect
    conn = self._new_conn()
  File "/home/tanmay/.local/lib/python3.6/site-packages/urllib3/connection.py", line 172, in _new_conn
    self, "Failed to establish a new connection: %s" % e
urllib3.exceptions.NewConnectionError: <urllib3.connection.HTTPConnection object at 0x7f040ed2e160>: Failed to establish a new connection: [Errno 111] Connection refused

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/tanmay/.local/lib/python3.6/site-packages/requests/adapters.py", line 449, in send
    timeout=timeout
  File "/home/tanmay/.local/lib/python3.6/site-packages/urllib3/connectionpool.py", line 725, in urlopen
    method, url, error=e, _pool=self, _stacktrace=sys.exc_info()[2]
  File "/home/tanmay/.local/lib/python3.6/site-packages/urllib3/util/retry.py", line 439, in increment
    raise MaxRetryError(_pool, url, error or ResponseError(cause))
urllib3.exceptions.MaxRetryError: HTTPConnectionPool(host='192.168.42.92', port=4200): Max retries exceeded with url: /graphql/graphql/alpha (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7f040ed2e160>: Failed to establish a new connection: [Errno 111] Connection refused',))

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "main.py", line 308, in <module>
    flow.register()
  File "/home/tanmay/.local/lib/python3.6/site-packages/prefect/core/flow.py", line 1429, in register
    no_url=no_url,
  File "/home/tanmay/.local/lib/python3.6/site-packages/prefect/client/client.py", line 640, in register
    version_group_id=version_group_id,
  File "/home/tanmay/.local/lib/python3.6/site-packages/prefect/client/client.py", line 218, in graphql
    token=token,
  File "/home/tanmay/.local/lib/python3.6/site-packages/prefect/client/client.py", line 178, in post
    token=token,
  File "/home/tanmay/.local/lib/python3.6/site-packages/prefect/client/client.py", line 304, in _request
    response = <http://session.post|session.post>(url, headers=headers, json=params, timeout=30)
  File "/home/tanmay/.local/lib/python3.6/site-packages/requests/sessions.py", line 578, in post
    return self.request('POST', url, data=data, json=json, **kwargs)
  File "/home/tanmay/.local/lib/python3.6/site-packages/requests/sessions.py", line 530, in request
    resp = self.send(prep, **send_kwargs)
  File "/home/tanmay/.local/lib/python3.6/site-packages/requests/sessions.py", line 643, in send
    r = adapter.send(request, **kwargs)
  File "/home/tanmay/.local/lib/python3.6/site-packages/requests/adapters.py", line 516, in send
    raise ConnectionError(e, request=request)
requests.exceptions.ConnectionError: HTTPConnectionPool(host='192.168.42.92', port=4200): Max retries exceeded with url: /graphql/graphql/alpha (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7f040ed2e160>: Failed to establish a new connection: [Errno 111] Connection refused',))