Hi - I am looking for a bit of help. I have create...
# ask-community
c
Hi - I am looking for a bit of help. I have created some python flows and tasks which mostly execute against the k8 agent on azure just fine, the task makes a REST put call on each flow, but some longer running calls get an error after 30 mins, and an exception is raised. I am certain the REST endpoint is ok, because I can run a call which lasts 1 hour without any problem outside of the flow. I am pretty new to python; but I will do my best to describe the whole scenario. The exception appears to be raised within the urllib3 library - the timeout I set is a single value of 14400 seconds (4 hours) but I have 4 flows out of 20+ that are long running and each of them blows up after 30 mins. I have isolated the problem to either the python code or the k8 agent, but I do not know which. any suggestions would be really appreciated! thanks and regards charles
a
Hi @Charles Hunt, could you perhaps move the traceback and all subsequent messages to this thread so that we can keep the main channel a bit cleaner? 🙂 It looks like there can be several issues here, but we need to look at them step by step: 1. Before moving to Kubernetes, could you show me your Flow so that I could reproduce locally and check if there are no issues there? 2. Then, when it comes to the Kubernetes agent on Azure - how did you start it? did you assign any labels to this agent? When it comes to the exception handling: • you generally don’t need to do that in Prefect - instead of suppressing errors, Prefect will use it to provide visibility when tasks in your Flow fail, and let you take action based on that e.g. set notification on failure • what is this POST request doing - does it return something immediately?
c
the python code to call the REST api is:
Copy code
#call the simpleingest endpoint
 response = <http://requests.post|requests.post>(apiendpointurl, headers=headers, data = requestdata, timeout=14400)
the exception is caught in a try-except block with
Copy code
except:
     logger.error("General runtime error during API call.")
     raise RuntimeError("General runtime error during API call.")
What I would like to happen is to avoid the exception being raised at all and the request to wait nicely until the REST api has completed
I am using the requests urllib3 library, and the error (and traceback) I get is:
Copy code
Task 'ingest_task': Exception encountered during task execution!
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/urllib3/connectionpool.py", line 706, in urlopen
    chunked=chunked,
  File "/usr/local/lib/python3.7/site-packages/urllib3/connectionpool.py", line 445, in _make_request
    six.raise_from(e, None)
  File "<string>", line 3, in raise_from
  File "/usr/local/lib/python3.7/site-packages/urllib3/connectionpool.py", line 440, in _make_request
    httplib_response = conn.getresponse()
  File "/usr/local/lib/python3.7/http/client.py", line 1373, in getresponse
    response.begin()
  File "/usr/local/lib/python3.7/http/client.py", line 319, in begin
    version, status, reason = self._read_status()
  File "/usr/local/lib/python3.7/http/client.py", line 280, in _read_status
    line = str(self.fp.readline(_MAXLINE + 1), "iso-8859-1")
  File "/usr/local/lib/python3.7/socket.py", line 589, in readinto
    return self._sock.recv_into(b)
ConnectionResetError: [Errno 104] Connection reset by peer
Copy code
# prefect libraries for accessing prefect runtime api
import prefect
from prefect import task, Flow
from prefect.storage import GitHub
from prefect.run_configs import KubernetesRun
from prefect.client import Secret

# psycp[g2 libraries for accessing PostgreSQL
import psycopg2
from psycopg2 import connect, sql
import psycopg2.extras
from psycopg2.extras import RealDictCursor

# used for making API calls
import requests
# used for date functions
from datetime import datetime, timedelta

# have already run `prefect backend cloud` CLI command on localhost to set orchestration backend for Prefect Cloud
# set flow config to log to cloud server
prefect.config.cloud.send_flow_run_logs = True

@task(log_stdout=True)
def ingest_task():
    logger = prefect.context.get("logger")
    apiendpointurl = '<http://10.10.10.10:5000/SimpleIngest>'
    headers = {
        'Content-type':'application/json',
        'api-key':'secretkey'
    }

    # get db config
    dbconfig = Secret("fsdb").get()

    # set up default hwm date and days
    hwmdate = (datetime.today() - timedelta(days = 1)).strftime('%Y/%m/%d')
    days = 0
    
    # get db hwm
    with connect(f"dbname={dbconfig['dbname']} host={dbconfig['host']} user={dbconfig['user']} password={dbconfig['password']} sslmode=require", cursor_factory=RealDictCursor) as pgcon:
        with pgcon.cursor() as pgcursor:

            # run the sql
            pgcursor.execute("SELECT max(next_max_hwm) as hwm FROM staging.high_water_marks where source_table = 'AAAAA_BBBBBB';")
            if pgcursor.rowcount > 0 :
                # extract the result
                pgrecord = pgcursor.fetchone()
                <http://logger.info|logger.info>(f"Control: High water mark date: {pgrecord['hwm']}")
                hwm = datetime.date(pgrecord['hwm'])                
                hwmdate = hwm.strftime('%Y/%m/%d')
                days = (datetime.date(datetime.today()) - hwm).days    
                <http://logger.info|logger.info>(f"Number of days delta: {days}")        
                if days > 0 :
                    requestdata = f"{{'JobName': 'delta_prod','DataJobId': 105, 'JobPathId': 'YYYYYYYY', 'DataJobType': 'Bounces', 'StartDate': '{hwmdate}', 'NumberOfDays': {days}, 'Identifiers': '1D3E4DAC-00000-0000-0000-10BD9A9869BC,ZZZ'}}"

                    # log the endpoint
                    <http://logger.info|logger.info>(apiendpointurl)

                    # log the inputs
                    <http://logger.info|logger.info>(requestdata)
                    
                    try:
                        #call the simpleingest endpoint
                        response = <http://requests.post|requests.post>(apiendpointurl, headers=headers, data = requestdata, timeout=14400)

                        if response.text.__contains__("completedSuccess") == False:
                            logger.error(response.json())
                            raise RuntimeError('SmartDataLoader API did not complete successfully')      

                    except RuntimeError as exc:
                        logger.error("API runtime failure.")
                        raise RuntimeError from exc
                    except TimeoutError as exc:
                        logger.error("API Timeout")
                        raise TimeoutError from exc
                    except:
                        logger.error("General runtime error during API call.")
                        raise RuntimeError("General runtime error during API call.")
                    else:
                        <http://logger.info|logger.info>(response.json())
                else:
                    <http://logger.info|logger.info>("Up to date with delta")

with Flow("git-prod-bronto-bounces-flow") as flow:
    ingest_task()

flow.storage = GitHub(
    repo="XXXXXXX",
    path="YYYYYYYY/flows/production/bronto/bronto_bounces_delta_flow.py",
    access_token_secret="GITHUB_ACCESS_TOKEN"
    )
flow.run_config = KubernetesRun(labels=["prod"],env={"EXTRA_PIP_PACKAGES": "psycopg2-binary requests"})
flow.register(project_name="YYYYYYYY")
Hi @Anna Geller thank you for your response- sorry about posting follow on messages on the main thread - I had not intended to do that - I thought I had posted on the question thread
@Anna Geller I have posted the python code - this is a code pattern I have deployed 20+ times and it works very reliably on flows that take less than 30 mins to complete
to answer your questions: 1. the flow is posted above 2. I am not sure how the agent was started - it was deployed by a colleague - I expect using the default labels.
The requests.post makes a request to a listener which goes off to perform an ingest task - when it is complete a response is issued with status and error information.
you can probably see in the code, although it traps exceptions it raises them again to ensure the flow is flagged with a failure.
Copy code
03:26:13
ERROR
ingest_task
General runtime error during API call.
03:26:13
ERROR
CloudTaskRunner
Task 'ingest_task': Exception encountered during task execution!
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/urllib3/connectionpool.py", line 706, in urlopen
    chunked=chunked,
  File "/usr/local/lib/python3.7/site-packages/urllib3/connectionpool.py", line 445, in _make_request
    six.raise_from(e, None)
  File "<string>", line 3, in raise_from
  File "/usr/local/lib/python3.7/site-packages/urllib3/connectionpool.py", line 440, in _make_request
    httplib_response = conn.getresponse()
  File "/usr/local/lib/python3.7/http/client.py", line 1373, in getresponse
    response.begin()
  File "/usr/local/lib/python3.7/http/client.py", line 319, in begin
    version, status, reason = self._read_status()
  File "/usr/local/lib/python3.7/http/client.py", line 280, in _read_status
    line = str(self.fp.readline(_MAXLINE + 1), "iso-8859-1")
  File "/usr/local/lib/python3.7/socket.py", line 589, in readinto
    return self._sock.recv_into(b)
ConnectionResetError: [Errno 104] Connection reset by peer

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/requests/adapters.py", line 449, in send
    timeout=timeout
  File "/usr/local/lib/python3.7/site-packages/urllib3/connectionpool.py", line 756, in urlopen
    method, url, error=e, _pool=self, _stacktrace=sys.exc_info()[2]
  File "/usr/local/lib/python3.7/site-packages/urllib3/util/retry.py", line 532, in increment
    raise six.reraise(type(error), error, _stacktrace)
  File "/usr/local/lib/python3.7/site-packages/urllib3/packages/six.py", line 769, in reraise
    raise value.with_traceback(tb)
  File "/usr/local/lib/python3.7/site-packages/urllib3/connectionpool.py", line 706, in urlopen
    chunked=chunked,
  File "/usr/local/lib/python3.7/site-packages/urllib3/connectionpool.py", line 445, in _make_request
    six.raise_from(e, None)
  File "<string>", line 3, in raise_from
  File "/usr/local/lib/python3.7/site-packages/urllib3/connectionpool.py", line 440, in _make_request
    httplib_response = conn.getresponse()
  File "/usr/local/lib/python3.7/http/client.py", line 1373, in getresponse
    response.begin()
  File "/usr/local/lib/python3.7/http/client.py", line 319, in begin
    version, status, reason = self._read_status()
  File "/usr/local/lib/python3.7/http/client.py", line 280, in _read_status
    line = str(self.fp.readline(_MAXLINE + 1), "iso-8859-1")
  File "/usr/local/lib/python3.7/socket.py", line 589, in readinto
    return self._sock.recv_into(b)
urllib3.exceptions.ProtocolError: ('Connection aborted.', ConnectionResetError(104, 'Connection reset by peer'))

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "<string>", line 64, in ingest_task
  File "/usr/local/lib/python3.7/site-packages/requests/api.py", line 119, in post
    return request('post', url, data=data, json=json, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/requests/api.py", line 61, in request
    return session.request(method=method, url=url, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/requests/sessions.py", line 542, in request
    resp = self.send(prep, **send_kwargs)
  File "/usr/local/lib/python3.7/site-packages/requests/sessions.py", line 655, in send
    r = adapter.send(request, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/requests/adapters.py", line 498, in send
    raise ConnectionError(err, request=request)
requests.exceptions.ConnectionError: ('Connection aborted.', ConnectionResetError(104, 'Connection reset by peer'))

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/prefect/engine/task_runner.py", line 863, in get_task_run_state
    logger=self.logger,
  File "/usr/local/lib/python3.7/site-packages/prefect/utilities/executors.py", line 445, in run_task_with_timeout
    return task.run(*args, **kwargs)  # type: ignore
  File "<string>", line 78, in ingest_task
RuntimeError: General runtime error during API call.
My hunch is either some kind of missing configuration/option on the requests object which means the http long running request connection is reset by the python stack or the container aborts the connection. But I don't know much about either python or the container config to know if either of those are correct. The testing I have done on the api host indicate that it is not resetting the connection as I can execute a request running for more than 1 hour without issue using a tool (which I wrote in another language) to make ad hoc requests against the api. (written in C#)
a
first of all, thank you so much for moving all the information into the thread! 🙏 I researched about this a bit and it looks like your issue may occur due to Azure Load Balancer dropping connections in AKS Kubernetes cluster, as described here. The issue was already reported by the community in this thread - sharing in case you want to have a look. Main findings: • Azure has given the advice to increase the idle timeout on the load balancer but acc. to a person from the community, this solutions did not fix the problem, and the connection reset was still appearing after roughly 4 minutes • There is an open PR to fix this: https://github.com/PrefectHQ/prefect/pull/5066
c
@Anna Geller The REST api used to be hosted by the azure web application service which indeed does have a 4 minute time out, which we ran into and have since mitigated by re-hosting the API onto an Azure Linux VM, so the 4 minute problem has been worked around. this issue is 30 minutes. As I mentioned I can run a long running request against the Azure VM for at least 60 minutes without any problem. The AKS related post mentions a 4 minute threshold but I don't know if that is the issue or not. I will try and test the python code outside of the k8 agent to see if it can work.
a
@Charles Hunt you’re 100% correct with that. If this is urgent for you, perhaps you can try the fix from the PR (i.e. deploy a new AKS Kubernetes agent using the code from the PR) and chime in with your findings. Alternatively, as a temporary workaround, perhaps you can (as you already mentioned): • create a VM on Azure, start a LocalAgent there and run this flow on a local agent, until the AKS Azure issue is fixed and released • temporarily disable load balancer for this specific cluster, and see if the issue occurs again
c
@Anna Geller Thank you for the ideas. I think we will try running a local agent on the VM - do you have a link to a resource on how to set that up?
a
Sure! But it’s really a single command after installing Prefect:
Copy code
prefect agent local start
But you may want to set up a Supervisord to make it run as a background process: • Docs: https://docs.prefect.io/orchestration/agents/local.html#flow-configuration • Supervisor: https://docs.prefect.io/orchestration/agents/local.html#using-with-supervisor
c
@Anna Geller I have set up a local agent on the azure VM and it is working well - all the tasks that worked against the K8 agent are still performing well, but now curiously now have almost the opposite problem with the longer running flows. The local agent is installed and running on the same host as the REST endpoint listener. The python REST call waits indefinitely even though I can see the REST endpoint has completed the request and logs that the request has completed and has a successful (no error) response - except the python based REST call just sits and waits and waits and waits. I think I must be making a silly mistake, but the same code pattern I am using works well against the shorter running tasks.
Copy code
import socket
from urllib3.connection import HTTPConnection
TCP_KEEPALIVE = 0x10
HTTPConnection.default_socket_options = HTTPConnection.default_socket_options + [
(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1),
    (socket.IPPROTO_TCP, TCP_KEEPALIVE, 60),
]
I found this bit of code which suggests that the keepalive needs to be configured on the HTTPConnection
I'm going to give it a go
a
Does this job run for a very long time?
c
the REST endpoint takes about 10 mins (600 secs) to respond if there is data available to process.
a
my idea was that since you mentioned you run your agent on the same host as the API now, that you may change this to localhost:
Copy code
apiendpointurl = '<http://10.10.10.10:5000/SimpleIngest>'
c
ok - I will try
Copy code
apiendpointurl = '<http://localhost:5000/SimpleIngest>'
a
overall, it’s hard for me to help since I don’t know your API. But I would try determining which component is a root cause here. You can try running this ingest job as a normal Python script without Prefect so that you can check whether this is a Prefect issue or a general DevOps configuration issue with the VM or API. If Prefect is not an issue, then you could investigate the API and networking a bit more.
c
I have successfully been able to run long running API requests against the VM based endpoint on azure - so I eliminated that as the root cause - I believe now the issue resides in the python urllib3 stack with perhaps the complication of some azure networking.
so I am trying the "localhost" idea in the hope the urllib3 python stack behaves better
I am not a native python developer, but of course it is the language of choice for writing the flows - I feel I have got some way to getting what I need from this, but it is just a few troubling flows that are tripping me up and it all revolves around the REST api request.
I just tested the flow using the "localhost:5000" - and it seemed to work - the response came back after 10 mins
👏
Thank you @Anna Geller - your ideas really helped me.
🙌 1
a
Nice work! Great to hear that!