Charles Hunt
11/02/2021, 9:04 AMAnna Geller
Charles Hunt
11/02/2021, 10:49 AM#call the simpleingest endpoint
 response = <http://requests.post|requests.post>(apiendpointurl, headers=headers, data = requestdata, timeout=14400)Charles Hunt
11/02/2021, 10:49 AMexcept:
     logger.error("General runtime error during API call.")
     raise RuntimeError("General runtime error during API call.")Charles Hunt
11/02/2021, 10:50 AMCharles Hunt
11/02/2021, 10:51 AMTask 'ingest_task': Exception encountered during task execution!
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/urllib3/connectionpool.py", line 706, in urlopen
    chunked=chunked,
  File "/usr/local/lib/python3.7/site-packages/urllib3/connectionpool.py", line 445, in _make_request
    six.raise_from(e, None)
  File "<string>", line 3, in raise_from
  File "/usr/local/lib/python3.7/site-packages/urllib3/connectionpool.py", line 440, in _make_request
    httplib_response = conn.getresponse()
  File "/usr/local/lib/python3.7/http/client.py", line 1373, in getresponse
    response.begin()
  File "/usr/local/lib/python3.7/http/client.py", line 319, in begin
    version, status, reason = self._read_status()
  File "/usr/local/lib/python3.7/http/client.py", line 280, in _read_status
    line = str(self.fp.readline(_MAXLINE + 1), "iso-8859-1")
  File "/usr/local/lib/python3.7/socket.py", line 589, in readinto
    return self._sock.recv_into(b)
ConnectionResetError: [Errno 104] Connection reset by peerCharles Hunt
11/02/2021, 10:56 AM# prefect libraries for accessing prefect runtime api
import prefect
from prefect import task, Flow
from prefect.storage import GitHub
from prefect.run_configs import KubernetesRun
from prefect.client import Secret
# psycp[g2 libraries for accessing PostgreSQL
import psycopg2
from psycopg2 import connect, sql
import psycopg2.extras
from psycopg2.extras import RealDictCursor
# used for making API calls
import requests
# used for date functions
from datetime import datetime, timedelta
# have already run `prefect backend cloud` CLI command on localhost to set orchestration backend for Prefect Cloud
# set flow config to log to cloud server
prefect.config.cloud.send_flow_run_logs = True
@task(log_stdout=True)
def ingest_task():
    logger = prefect.context.get("logger")
    apiendpointurl = '<http://10.10.10.10:5000/SimpleIngest>'
    headers = {
        'Content-type':'application/json',
        'api-key':'secretkey'
    }
    # get db config
    dbconfig = Secret("fsdb").get()
    # set up default hwm date and days
    hwmdate = (datetime.today() - timedelta(days = 1)).strftime('%Y/%m/%d')
    days = 0
    
    # get db hwm
    with connect(f"dbname={dbconfig['dbname']} host={dbconfig['host']} user={dbconfig['user']} password={dbconfig['password']} sslmode=require", cursor_factory=RealDictCursor) as pgcon:
        with pgcon.cursor() as pgcursor:
            # run the sql
            pgcursor.execute("SELECT max(next_max_hwm) as hwm FROM staging.high_water_marks where source_table = 'AAAAA_BBBBBB';")
            if pgcursor.rowcount > 0 :
                # extract the result
                pgrecord = pgcursor.fetchone()
                <http://logger.info|logger.info>(f"Control: High water mark date: {pgrecord['hwm']}")
                hwm = datetime.date(pgrecord['hwm'])                
                hwmdate = hwm.strftime('%Y/%m/%d')
                days = (datetime.date(datetime.today()) - hwm).days    
                <http://logger.info|logger.info>(f"Number of days delta: {days}")        
                if days > 0 :
                    requestdata = f"{{'JobName': 'delta_prod','DataJobId': 105, 'JobPathId': 'YYYYYYYY', 'DataJobType': 'Bounces', 'StartDate': '{hwmdate}', 'NumberOfDays': {days}, 'Identifiers': '1D3E4DAC-00000-0000-0000-10BD9A9869BC,ZZZ'}}"
                    # log the endpoint
                    <http://logger.info|logger.info>(apiendpointurl)
                    # log the inputs
                    <http://logger.info|logger.info>(requestdata)
                    
                    try:
                        #call the simpleingest endpoint
                        response = <http://requests.post|requests.post>(apiendpointurl, headers=headers, data = requestdata, timeout=14400)
                        if response.text.__contains__("completedSuccess") == False:
                            logger.error(response.json())
                            raise RuntimeError('SmartDataLoader API did not complete successfully')      
                    except RuntimeError as exc:
                        logger.error("API runtime failure.")
                        raise RuntimeError from exc
                    except TimeoutError as exc:
                        logger.error("API Timeout")
                        raise TimeoutError from exc
                    except:
                        logger.error("General runtime error during API call.")
                        raise RuntimeError("General runtime error during API call.")
                    else:
                        <http://logger.info|logger.info>(response.json())
                else:
                    <http://logger.info|logger.info>("Up to date with delta")
with Flow("git-prod-bronto-bounces-flow") as flow:
    ingest_task()
flow.storage = GitHub(
    repo="XXXXXXX",
    path="YYYYYYYY/flows/production/bronto/bronto_bounces_delta_flow.py",
    access_token_secret="GITHUB_ACCESS_TOKEN"
    )
flow.run_config = KubernetesRun(labels=["prod"],env={"EXTRA_PIP_PACKAGES": "psycopg2-binary requests"})
flow.register(project_name="YYYYYYYY")Charles Hunt
11/02/2021, 10:57 AMCharles Hunt
11/02/2021, 10:58 AMCharles Hunt
11/02/2021, 11:00 AMCharles Hunt
11/02/2021, 11:01 AMCharles Hunt
11/02/2021, 11:02 AMCharles Hunt
11/02/2021, 11:05 AM03:26:13
ERROR
ingest_task
General runtime error during API call.
03:26:13
ERROR
CloudTaskRunner
Task 'ingest_task': Exception encountered during task execution!
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/urllib3/connectionpool.py", line 706, in urlopen
    chunked=chunked,
  File "/usr/local/lib/python3.7/site-packages/urllib3/connectionpool.py", line 445, in _make_request
    six.raise_from(e, None)
  File "<string>", line 3, in raise_from
  File "/usr/local/lib/python3.7/site-packages/urllib3/connectionpool.py", line 440, in _make_request
    httplib_response = conn.getresponse()
  File "/usr/local/lib/python3.7/http/client.py", line 1373, in getresponse
    response.begin()
  File "/usr/local/lib/python3.7/http/client.py", line 319, in begin
    version, status, reason = self._read_status()
  File "/usr/local/lib/python3.7/http/client.py", line 280, in _read_status
    line = str(self.fp.readline(_MAXLINE + 1), "iso-8859-1")
  File "/usr/local/lib/python3.7/socket.py", line 589, in readinto
    return self._sock.recv_into(b)
ConnectionResetError: [Errno 104] Connection reset by peer
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/requests/adapters.py", line 449, in send
    timeout=timeout
  File "/usr/local/lib/python3.7/site-packages/urllib3/connectionpool.py", line 756, in urlopen
    method, url, error=e, _pool=self, _stacktrace=sys.exc_info()[2]
  File "/usr/local/lib/python3.7/site-packages/urllib3/util/retry.py", line 532, in increment
    raise six.reraise(type(error), error, _stacktrace)
  File "/usr/local/lib/python3.7/site-packages/urllib3/packages/six.py", line 769, in reraise
    raise value.with_traceback(tb)
  File "/usr/local/lib/python3.7/site-packages/urllib3/connectionpool.py", line 706, in urlopen
    chunked=chunked,
  File "/usr/local/lib/python3.7/site-packages/urllib3/connectionpool.py", line 445, in _make_request
    six.raise_from(e, None)
  File "<string>", line 3, in raise_from
  File "/usr/local/lib/python3.7/site-packages/urllib3/connectionpool.py", line 440, in _make_request
    httplib_response = conn.getresponse()
  File "/usr/local/lib/python3.7/http/client.py", line 1373, in getresponse
    response.begin()
  File "/usr/local/lib/python3.7/http/client.py", line 319, in begin
    version, status, reason = self._read_status()
  File "/usr/local/lib/python3.7/http/client.py", line 280, in _read_status
    line = str(self.fp.readline(_MAXLINE + 1), "iso-8859-1")
  File "/usr/local/lib/python3.7/socket.py", line 589, in readinto
    return self._sock.recv_into(b)
urllib3.exceptions.ProtocolError: ('Connection aborted.', ConnectionResetError(104, 'Connection reset by peer'))
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
  File "<string>", line 64, in ingest_task
  File "/usr/local/lib/python3.7/site-packages/requests/api.py", line 119, in post
    return request('post', url, data=data, json=json, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/requests/api.py", line 61, in request
    return session.request(method=method, url=url, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/requests/sessions.py", line 542, in request
    resp = self.send(prep, **send_kwargs)
  File "/usr/local/lib/python3.7/site-packages/requests/sessions.py", line 655, in send
    r = adapter.send(request, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/requests/adapters.py", line 498, in send
    raise ConnectionError(err, request=request)
requests.exceptions.ConnectionError: ('Connection aborted.', ConnectionResetError(104, 'Connection reset by peer'))
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/prefect/engine/task_runner.py", line 863, in get_task_run_state
    logger=self.logger,
  File "/usr/local/lib/python3.7/site-packages/prefect/utilities/executors.py", line 445, in run_task_with_timeout
    return task.run(*args, **kwargs)  # type: ignore
  File "<string>", line 78, in ingest_task
RuntimeError: General runtime error during API call.Charles Hunt
11/02/2021, 11:10 AMAnna Geller
Charles Hunt
11/02/2021, 11:37 AMAnna Geller
Charles Hunt
11/02/2021, 1:04 PMAnna Geller
prefect agent local startCharles Hunt
11/05/2021, 12:00 PMCharles Hunt
11/05/2021, 12:08 PMimport socket
from urllib3.connection import HTTPConnection
TCP_KEEPALIVE = 0x10
HTTPConnection.default_socket_options = HTTPConnection.default_socket_options + [
(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1),
    (socket.IPPROTO_TCP, TCP_KEEPALIVE, 60),
]Charles Hunt
11/05/2021, 12:09 PMCharles Hunt
11/05/2021, 12:09 PMAnna Geller
Charles Hunt
11/05/2021, 12:10 PMAnna Geller
apiendpointurl = '<http://10.10.10.10:5000/SimpleIngest>'Charles Hunt
11/05/2021, 12:11 PMapiendpointurl = '<http://localhost:5000/SimpleIngest>'Anna Geller
Charles Hunt
11/05/2021, 12:26 PMCharles Hunt
11/05/2021, 12:27 PMCharles Hunt
11/05/2021, 12:30 PMCharles Hunt
11/05/2021, 12:31 PMCharles Hunt
11/05/2021, 12:31 PMCharles Hunt
11/05/2021, 12:31 PMAnna Geller
