Thread
#prefect-community
    c

    Charles Hunt

    10 months ago
    Hi - I am looking for a bit of help. I have created some python flows and tasks which mostly execute against the k8 agent on azure just fine, the task makes a REST put call on each flow, but some longer running calls get an error after 30 mins, and an exception is raised. I am certain the REST endpoint is ok, because I can run a call which lasts 1 hour without any problem outside of the flow. I am pretty new to python; but I will do my best to describe the whole scenario. The exception appears to be raised within the urllib3 library - the timeout I set is a single value of 14400 seconds (4 hours) but I have 4 flows out of 20+ that are long running and each of them blows up after 30 mins. I have isolated the problem to either the python code or the k8 agent, but I do not know which. any suggestions would be really appreciated! thanks and regards charles
    Anna Geller

    Anna Geller

    10 months ago
    Hi @Charles Hunt, could you perhaps move the traceback and all subsequent messages to this thread so that we can keep the main channel a bit cleaner? 🙂 It looks like there can be several issues here, but we need to look at them step by step:1. Before moving to Kubernetes, could you show me your Flow so that I could reproduce locally and check if there are no issues there? 2. Then, when it comes to the Kubernetes agent on Azure - how did you start it? did you assign any labels to this agent? When it comes to the exception handling: • you generally don’t need to do that in Prefect - instead of suppressing errors, Prefect will use it to provide visibility when tasks in your Flow fail, and let you take action based on that e.g. set notification on failure • what is this POST request doing - does it return something immediately?
    c

    Charles Hunt

    10 months ago
    the python code to call the REST api is:
    #call the simpleingest endpoint
     response = <http://requests.post|requests.post>(apiendpointurl, headers=headers, data = requestdata, timeout=14400)
    the exception is caught in a try-except block with
    except:
         logger.error("General runtime error during API call.")
         raise RuntimeError("General runtime error during API call.")
    What I would like to happen is to avoid the exception being raised at all and the request to wait nicely until the REST api has completed
    I am using the requests urllib3 library, and the error (and traceback) I get is:
    Task 'ingest_task': Exception encountered during task execution!
    Traceback (most recent call last):
      File "/usr/local/lib/python3.7/site-packages/urllib3/connectionpool.py", line 706, in urlopen
        chunked=chunked,
      File "/usr/local/lib/python3.7/site-packages/urllib3/connectionpool.py", line 445, in _make_request
        six.raise_from(e, None)
      File "<string>", line 3, in raise_from
      File "/usr/local/lib/python3.7/site-packages/urllib3/connectionpool.py", line 440, in _make_request
        httplib_response = conn.getresponse()
      File "/usr/local/lib/python3.7/http/client.py", line 1373, in getresponse
        response.begin()
      File "/usr/local/lib/python3.7/http/client.py", line 319, in begin
        version, status, reason = self._read_status()
      File "/usr/local/lib/python3.7/http/client.py", line 280, in _read_status
        line = str(self.fp.readline(_MAXLINE + 1), "iso-8859-1")
      File "/usr/local/lib/python3.7/socket.py", line 589, in readinto
        return self._sock.recv_into(b)
    ConnectionResetError: [Errno 104] Connection reset by peer
    # prefect libraries for accessing prefect runtime api
    import prefect
    from prefect import task, Flow
    from prefect.storage import GitHub
    from prefect.run_configs import KubernetesRun
    from prefect.client import Secret
    
    # psycp[g2 libraries for accessing PostgreSQL
    import psycopg2
    from psycopg2 import connect, sql
    import psycopg2.extras
    from psycopg2.extras import RealDictCursor
    
    # used for making API calls
    import requests
    # used for date functions
    from datetime import datetime, timedelta
    
    # have already run `prefect backend cloud` CLI command on localhost to set orchestration backend for Prefect Cloud
    # set flow config to log to cloud server
    prefect.config.cloud.send_flow_run_logs = True
    
    @task(log_stdout=True)
    def ingest_task():
        logger = prefect.context.get("logger")
        apiendpointurl = '<http://10.10.10.10:5000/SimpleIngest>'
        headers = {
            'Content-type':'application/json',
            'api-key':'secretkey'
        }
    
        # get db config
        dbconfig = Secret("fsdb").get()
    
        # set up default hwm date and days
        hwmdate = (datetime.today() - timedelta(days = 1)).strftime('%Y/%m/%d')
        days = 0
        
        # get db hwm
        with connect(f"dbname={dbconfig['dbname']} host={dbconfig['host']} user={dbconfig['user']} password={dbconfig['password']} sslmode=require", cursor_factory=RealDictCursor) as pgcon:
            with pgcon.cursor() as pgcursor:
    
                # run the sql
                pgcursor.execute("SELECT max(next_max_hwm) as hwm FROM staging.high_water_marks where source_table = 'AAAAA_BBBBBB';")
                if pgcursor.rowcount > 0 :
                    # extract the result
                    pgrecord = pgcursor.fetchone()
                    <http://logger.info|logger.info>(f"Control: High water mark date: {pgrecord['hwm']}")
                    hwm = datetime.date(pgrecord['hwm'])                
                    hwmdate = hwm.strftime('%Y/%m/%d')
                    days = (datetime.date(datetime.today()) - hwm).days    
                    <http://logger.info|logger.info>(f"Number of days delta: {days}")        
                    if days > 0 :
                        requestdata = f"{{'JobName': 'delta_prod','DataJobId': 105, 'JobPathId': 'YYYYYYYY', 'DataJobType': 'Bounces', 'StartDate': '{hwmdate}', 'NumberOfDays': {days}, 'Identifiers': '1D3E4DAC-00000-0000-0000-10BD9A9869BC,ZZZ'}}"
    
                        # log the endpoint
                        <http://logger.info|logger.info>(apiendpointurl)
    
                        # log the inputs
                        <http://logger.info|logger.info>(requestdata)
                        
                        try:
                            #call the simpleingest endpoint
                            response = <http://requests.post|requests.post>(apiendpointurl, headers=headers, data = requestdata, timeout=14400)
    
                            if response.text.__contains__("completedSuccess") == False:
                                logger.error(response.json())
                                raise RuntimeError('SmartDataLoader API did not complete successfully')      
    
                        except RuntimeError as exc:
                            logger.error("API runtime failure.")
                            raise RuntimeError from exc
                        except TimeoutError as exc:
                            logger.error("API Timeout")
                            raise TimeoutError from exc
                        except:
                            logger.error("General runtime error during API call.")
                            raise RuntimeError("General runtime error during API call.")
                        else:
                            <http://logger.info|logger.info>(response.json())
                    else:
                        <http://logger.info|logger.info>("Up to date with delta")
    
    with Flow("git-prod-bronto-bounces-flow") as flow:
        ingest_task()
    
    flow.storage = GitHub(
        repo="XXXXXXX",
        path="YYYYYYYY/flows/production/bronto/bronto_bounces_delta_flow.py",
        access_token_secret="GITHUB_ACCESS_TOKEN"
        )
    flow.run_config = KubernetesRun(labels=["prod"],env={"EXTRA_PIP_PACKAGES": "psycopg2-binary requests"})
    flow.register(project_name="YYYYYYYY")
    Hi @Anna Geller thank you for your response- sorry about posting follow on messages on the main thread - I had not intended to do that - I thought I had posted on the question thread
    @Anna Geller I have posted the python code - this is a code pattern I have deployed 20+ times and it works very reliably on flows that take less than 30 mins to complete
    to answer your questions:1. the flow is posted above 2. I am not sure how the agent was started - it was deployed by a colleague - I expect using the default labels.
    The requests.post makes a request to a listener which goes off to perform an ingest task - when it is complete a response is issued with status and error information.
    you can probably see in the code, although it traps exceptions it raises them again to ensure the flow is flagged with a failure.
    03:26:13
    ERROR
    ingest_task
    General runtime error during API call.
    03:26:13
    ERROR
    CloudTaskRunner
    Task 'ingest_task': Exception encountered during task execution!
    Traceback (most recent call last):
      File "/usr/local/lib/python3.7/site-packages/urllib3/connectionpool.py", line 706, in urlopen
        chunked=chunked,
      File "/usr/local/lib/python3.7/site-packages/urllib3/connectionpool.py", line 445, in _make_request
        six.raise_from(e, None)
      File "<string>", line 3, in raise_from
      File "/usr/local/lib/python3.7/site-packages/urllib3/connectionpool.py", line 440, in _make_request
        httplib_response = conn.getresponse()
      File "/usr/local/lib/python3.7/http/client.py", line 1373, in getresponse
        response.begin()
      File "/usr/local/lib/python3.7/http/client.py", line 319, in begin
        version, status, reason = self._read_status()
      File "/usr/local/lib/python3.7/http/client.py", line 280, in _read_status
        line = str(self.fp.readline(_MAXLINE + 1), "iso-8859-1")
      File "/usr/local/lib/python3.7/socket.py", line 589, in readinto
        return self._sock.recv_into(b)
    ConnectionResetError: [Errno 104] Connection reset by peer
    
    During handling of the above exception, another exception occurred:
    
    Traceback (most recent call last):
      File "/usr/local/lib/python3.7/site-packages/requests/adapters.py", line 449, in send
        timeout=timeout
      File "/usr/local/lib/python3.7/site-packages/urllib3/connectionpool.py", line 756, in urlopen
        method, url, error=e, _pool=self, _stacktrace=sys.exc_info()[2]
      File "/usr/local/lib/python3.7/site-packages/urllib3/util/retry.py", line 532, in increment
        raise six.reraise(type(error), error, _stacktrace)
      File "/usr/local/lib/python3.7/site-packages/urllib3/packages/six.py", line 769, in reraise
        raise value.with_traceback(tb)
      File "/usr/local/lib/python3.7/site-packages/urllib3/connectionpool.py", line 706, in urlopen
        chunked=chunked,
      File "/usr/local/lib/python3.7/site-packages/urllib3/connectionpool.py", line 445, in _make_request
        six.raise_from(e, None)
      File "<string>", line 3, in raise_from
      File "/usr/local/lib/python3.7/site-packages/urllib3/connectionpool.py", line 440, in _make_request
        httplib_response = conn.getresponse()
      File "/usr/local/lib/python3.7/http/client.py", line 1373, in getresponse
        response.begin()
      File "/usr/local/lib/python3.7/http/client.py", line 319, in begin
        version, status, reason = self._read_status()
      File "/usr/local/lib/python3.7/http/client.py", line 280, in _read_status
        line = str(self.fp.readline(_MAXLINE + 1), "iso-8859-1")
      File "/usr/local/lib/python3.7/socket.py", line 589, in readinto
        return self._sock.recv_into(b)
    urllib3.exceptions.ProtocolError: ('Connection aborted.', ConnectionResetError(104, 'Connection reset by peer'))
    
    During handling of the above exception, another exception occurred:
    
    Traceback (most recent call last):
      File "<string>", line 64, in ingest_task
      File "/usr/local/lib/python3.7/site-packages/requests/api.py", line 119, in post
        return request('post', url, data=data, json=json, **kwargs)
      File "/usr/local/lib/python3.7/site-packages/requests/api.py", line 61, in request
        return session.request(method=method, url=url, **kwargs)
      File "/usr/local/lib/python3.7/site-packages/requests/sessions.py", line 542, in request
        resp = self.send(prep, **send_kwargs)
      File "/usr/local/lib/python3.7/site-packages/requests/sessions.py", line 655, in send
        r = adapter.send(request, **kwargs)
      File "/usr/local/lib/python3.7/site-packages/requests/adapters.py", line 498, in send
        raise ConnectionError(err, request=request)
    requests.exceptions.ConnectionError: ('Connection aborted.', ConnectionResetError(104, 'Connection reset by peer'))
    
    During handling of the above exception, another exception occurred:
    
    Traceback (most recent call last):
      File "/usr/local/lib/python3.7/site-packages/prefect/engine/task_runner.py", line 863, in get_task_run_state
        logger=self.logger,
      File "/usr/local/lib/python3.7/site-packages/prefect/utilities/executors.py", line 445, in run_task_with_timeout
        return task.run(*args, **kwargs)  # type: ignore
      File "<string>", line 78, in ingest_task
    RuntimeError: General runtime error during API call.
    My hunch is either some kind of missing configuration/option on the requests object which means the http long running request connection is reset by the python stack or the container aborts the connection. But I don't know much about either python or the container config to know if either of those are correct. The testing I have done on the api host indicate that it is not resetting the connection as I can execute a request running for more than 1 hour without issue using a tool (which I wrote in another language) to make ad hoc requests against the api. (written in C#)
    Anna Geller

    Anna Geller

    10 months ago
    first of all, thank you so much for moving all the information into the thread! 🙏 I researched about this a bit and it looks like your issue may occur due to Azure Load Balancer dropping connections in AKS Kubernetes cluster, as described here. The issue was already reported by the community in this thread - sharing in case you want to have a look. Main findings: • Azure has given the advice to increase the idle timeout on the load balancer but acc. to a person from the community, this solutions did not fix the problem, and the connection reset was still appearing after roughly 4 minutes • There is an open PR to fix this: https://github.com/PrefectHQ/prefect/pull/5066
    c

    Charles Hunt

    10 months ago
    @Anna Geller The REST api used to be hosted by the azure web application service which indeed does have a 4 minute time out, which we ran into and have since mitigated by re-hosting the API onto an Azure Linux VM, so the 4 minute problem has been worked around. this issue is 30 minutes. As I mentioned I can run a long running request against the Azure VM for at least 60 minutes without any problem. The AKS related post mentions a 4 minute threshold but I don't know if that is the issue or not. I will try and test the python code outside of the k8 agent to see if it can work.
    Anna Geller

    Anna Geller

    10 months ago
    @Charles Hunt you’re 100% correct with that. If this is urgent for you, perhaps you can try the fix from the PR (i.e. deploy a new AKS Kubernetes agent using the code from the PR) and chime in with your findings. Alternatively, as a temporary workaround, perhaps you can (as you already mentioned): • create a VM on Azure, start a LocalAgent there and run this flow on a local agent, until the AKS Azure issue is fixed and released • temporarily disable load balancer for this specific cluster, and see if the issue occurs again
    c

    Charles Hunt

    10 months ago
    @Anna Geller Thank you for the ideas. I think we will try running a local agent on the VM - do you have a link to a resource on how to set that up?
    Anna Geller

    Anna Geller

    10 months ago
    Sure! But it’s really a single command after installing Prefect:
    prefect agent local start
    But you may want to set up a Supervisord to make it run as a background process: • Docs: https://docs.prefect.io/orchestration/agents/local.html#flow-configuration • Supervisor: https://docs.prefect.io/orchestration/agents/local.html#using-with-supervisor
    c

    Charles Hunt

    10 months ago
    @Anna Geller I have set up a local agent on the azure VM and it is working well - all the tasks that worked against the K8 agent are still performing well, but now curiously now have almost the opposite problem with the longer running flows. The local agent is installed and running on the same host as the REST endpoint listener. The python REST call waits indefinitely even though I can see the REST endpoint has completed the request and logs that the request has completed and has a successful (no error) response - except the python based REST call just sits and waits and waits and waits. I think I must be making a silly mistake, but the same code pattern I am using works well against the shorter running tasks.
    import socket
    from urllib3.connection import HTTPConnection
    TCP_KEEPALIVE = 0x10
    HTTPConnection.default_socket_options = HTTPConnection.default_socket_options + [
    (socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1),
        (socket.IPPROTO_TCP, TCP_KEEPALIVE, 60),
    ]
    I found this bit of code which suggests that the keepalive needs to be configured on the HTTPConnection
    I'm going to give it a go
    Anna Geller

    Anna Geller

    10 months ago
    Does this job run for a very long time?
    c

    Charles Hunt

    10 months ago
    the REST endpoint takes about 10 mins (600 secs) to respond if there is data available to process.
    Anna Geller

    Anna Geller

    10 months ago
    my idea was that since you mentioned you run your agent on the same host as the API now, that you may change this to localhost:
    apiendpointurl = '<http://10.10.10.10:5000/SimpleIngest>'
    c

    Charles Hunt

    10 months ago
    ok - I will try
    apiendpointurl = '<http://localhost:5000/SimpleIngest>'
    Anna Geller

    Anna Geller

    10 months ago
    overall, it’s hard for me to help since I don’t know your API. But I would try determining which component is a root cause here. You can try running this ingest job as a normal Python script without Prefect so that you can check whether this is a Prefect issue or a general DevOps configuration issue with the VM or API. If Prefect is not an issue, then you could investigate the API and networking a bit more.
    c

    Charles Hunt

    10 months ago
    I have successfully been able to run long running API requests against the VM based endpoint on azure - so I eliminated that as the root cause - I believe now the issue resides in the python urllib3 stack with perhaps the complication of some azure networking.
    so I am trying the "localhost" idea in the hope the urllib3 python stack behaves better
    I am not a native python developer, but of course it is the language of choice for writing the flows - I feel I have got some way to getting what I need from this, but it is just a few troubling flows that are tripping me up and it all revolves around the REST api request.
    I just tested the flow using the "localhost:5000" - and it seemed to work - the response came back after 10 mins
    👏
    Thank you @Anna Geller - your ideas really helped me.
    Anna Geller

    Anna Geller

    10 months ago
    Nice work! Great to hear that!