Charles Hunt
11/02/2021, 9:04 AMAnna Geller
Charles Hunt
11/02/2021, 10:49 AM#call the simpleingest endpoint
response = <http://requests.post|requests.post>(apiendpointurl, headers=headers, data = requestdata, timeout=14400)
Charles Hunt
11/02/2021, 10:49 AMexcept:
logger.error("General runtime error during API call.")
raise RuntimeError("General runtime error during API call.")
Charles Hunt
11/02/2021, 10:50 AMCharles Hunt
11/02/2021, 10:51 AMTask 'ingest_task': Exception encountered during task execution!
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/urllib3/connectionpool.py", line 706, in urlopen
chunked=chunked,
File "/usr/local/lib/python3.7/site-packages/urllib3/connectionpool.py", line 445, in _make_request
six.raise_from(e, None)
File "<string>", line 3, in raise_from
File "/usr/local/lib/python3.7/site-packages/urllib3/connectionpool.py", line 440, in _make_request
httplib_response = conn.getresponse()
File "/usr/local/lib/python3.7/http/client.py", line 1373, in getresponse
response.begin()
File "/usr/local/lib/python3.7/http/client.py", line 319, in begin
version, status, reason = self._read_status()
File "/usr/local/lib/python3.7/http/client.py", line 280, in _read_status
line = str(self.fp.readline(_MAXLINE + 1), "iso-8859-1")
File "/usr/local/lib/python3.7/socket.py", line 589, in readinto
return self._sock.recv_into(b)
ConnectionResetError: [Errno 104] Connection reset by peer
Charles Hunt
11/02/2021, 10:56 AM# prefect libraries for accessing prefect runtime api
import prefect
from prefect import task, Flow
from prefect.storage import GitHub
from prefect.run_configs import KubernetesRun
from prefect.client import Secret
# psycp[g2 libraries for accessing PostgreSQL
import psycopg2
from psycopg2 import connect, sql
import psycopg2.extras
from psycopg2.extras import RealDictCursor
# used for making API calls
import requests
# used for date functions
from datetime import datetime, timedelta
# have already run `prefect backend cloud` CLI command on localhost to set orchestration backend for Prefect Cloud
# set flow config to log to cloud server
prefect.config.cloud.send_flow_run_logs = True
@task(log_stdout=True)
def ingest_task():
logger = prefect.context.get("logger")
apiendpointurl = '<http://10.10.10.10:5000/SimpleIngest>'
headers = {
'Content-type':'application/json',
'api-key':'secretkey'
}
# get db config
dbconfig = Secret("fsdb").get()
# set up default hwm date and days
hwmdate = (datetime.today() - timedelta(days = 1)).strftime('%Y/%m/%d')
days = 0
# get db hwm
with connect(f"dbname={dbconfig['dbname']} host={dbconfig['host']} user={dbconfig['user']} password={dbconfig['password']} sslmode=require", cursor_factory=RealDictCursor) as pgcon:
with pgcon.cursor() as pgcursor:
# run the sql
pgcursor.execute("SELECT max(next_max_hwm) as hwm FROM staging.high_water_marks where source_table = 'AAAAA_BBBBBB';")
if pgcursor.rowcount > 0 :
# extract the result
pgrecord = pgcursor.fetchone()
<http://logger.info|logger.info>(f"Control: High water mark date: {pgrecord['hwm']}")
hwm = datetime.date(pgrecord['hwm'])
hwmdate = hwm.strftime('%Y/%m/%d')
days = (datetime.date(datetime.today()) - hwm).days
<http://logger.info|logger.info>(f"Number of days delta: {days}")
if days > 0 :
requestdata = f"{{'JobName': 'delta_prod','DataJobId': 105, 'JobPathId': 'YYYYYYYY', 'DataJobType': 'Bounces', 'StartDate': '{hwmdate}', 'NumberOfDays': {days}, 'Identifiers': '1D3E4DAC-00000-0000-0000-10BD9A9869BC,ZZZ'}}"
# log the endpoint
<http://logger.info|logger.info>(apiendpointurl)
# log the inputs
<http://logger.info|logger.info>(requestdata)
try:
#call the simpleingest endpoint
response = <http://requests.post|requests.post>(apiendpointurl, headers=headers, data = requestdata, timeout=14400)
if response.text.__contains__("completedSuccess") == False:
logger.error(response.json())
raise RuntimeError('SmartDataLoader API did not complete successfully')
except RuntimeError as exc:
logger.error("API runtime failure.")
raise RuntimeError from exc
except TimeoutError as exc:
logger.error("API Timeout")
raise TimeoutError from exc
except:
logger.error("General runtime error during API call.")
raise RuntimeError("General runtime error during API call.")
else:
<http://logger.info|logger.info>(response.json())
else:
<http://logger.info|logger.info>("Up to date with delta")
with Flow("git-prod-bronto-bounces-flow") as flow:
ingest_task()
flow.storage = GitHub(
repo="XXXXXXX",
path="YYYYYYYY/flows/production/bronto/bronto_bounces_delta_flow.py",
access_token_secret="GITHUB_ACCESS_TOKEN"
)
flow.run_config = KubernetesRun(labels=["prod"],env={"EXTRA_PIP_PACKAGES": "psycopg2-binary requests"})
flow.register(project_name="YYYYYYYY")
Charles Hunt
11/02/2021, 10:57 AMCharles Hunt
11/02/2021, 10:58 AMCharles Hunt
11/02/2021, 11:00 AMCharles Hunt
11/02/2021, 11:01 AMCharles Hunt
11/02/2021, 11:02 AMCharles Hunt
11/02/2021, 11:05 AM03:26:13
ERROR
ingest_task
General runtime error during API call.
03:26:13
ERROR
CloudTaskRunner
Task 'ingest_task': Exception encountered during task execution!
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/urllib3/connectionpool.py", line 706, in urlopen
chunked=chunked,
File "/usr/local/lib/python3.7/site-packages/urllib3/connectionpool.py", line 445, in _make_request
six.raise_from(e, None)
File "<string>", line 3, in raise_from
File "/usr/local/lib/python3.7/site-packages/urllib3/connectionpool.py", line 440, in _make_request
httplib_response = conn.getresponse()
File "/usr/local/lib/python3.7/http/client.py", line 1373, in getresponse
response.begin()
File "/usr/local/lib/python3.7/http/client.py", line 319, in begin
version, status, reason = self._read_status()
File "/usr/local/lib/python3.7/http/client.py", line 280, in _read_status
line = str(self.fp.readline(_MAXLINE + 1), "iso-8859-1")
File "/usr/local/lib/python3.7/socket.py", line 589, in readinto
return self._sock.recv_into(b)
ConnectionResetError: [Errno 104] Connection reset by peer
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/requests/adapters.py", line 449, in send
timeout=timeout
File "/usr/local/lib/python3.7/site-packages/urllib3/connectionpool.py", line 756, in urlopen
method, url, error=e, _pool=self, _stacktrace=sys.exc_info()[2]
File "/usr/local/lib/python3.7/site-packages/urllib3/util/retry.py", line 532, in increment
raise six.reraise(type(error), error, _stacktrace)
File "/usr/local/lib/python3.7/site-packages/urllib3/packages/six.py", line 769, in reraise
raise value.with_traceback(tb)
File "/usr/local/lib/python3.7/site-packages/urllib3/connectionpool.py", line 706, in urlopen
chunked=chunked,
File "/usr/local/lib/python3.7/site-packages/urllib3/connectionpool.py", line 445, in _make_request
six.raise_from(e, None)
File "<string>", line 3, in raise_from
File "/usr/local/lib/python3.7/site-packages/urllib3/connectionpool.py", line 440, in _make_request
httplib_response = conn.getresponse()
File "/usr/local/lib/python3.7/http/client.py", line 1373, in getresponse
response.begin()
File "/usr/local/lib/python3.7/http/client.py", line 319, in begin
version, status, reason = self._read_status()
File "/usr/local/lib/python3.7/http/client.py", line 280, in _read_status
line = str(self.fp.readline(_MAXLINE + 1), "iso-8859-1")
File "/usr/local/lib/python3.7/socket.py", line 589, in readinto
return self._sock.recv_into(b)
urllib3.exceptions.ProtocolError: ('Connection aborted.', ConnectionResetError(104, 'Connection reset by peer'))
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "<string>", line 64, in ingest_task
File "/usr/local/lib/python3.7/site-packages/requests/api.py", line 119, in post
return request('post', url, data=data, json=json, **kwargs)
File "/usr/local/lib/python3.7/site-packages/requests/api.py", line 61, in request
return session.request(method=method, url=url, **kwargs)
File "/usr/local/lib/python3.7/site-packages/requests/sessions.py", line 542, in request
resp = self.send(prep, **send_kwargs)
File "/usr/local/lib/python3.7/site-packages/requests/sessions.py", line 655, in send
r = adapter.send(request, **kwargs)
File "/usr/local/lib/python3.7/site-packages/requests/adapters.py", line 498, in send
raise ConnectionError(err, request=request)
requests.exceptions.ConnectionError: ('Connection aborted.', ConnectionResetError(104, 'Connection reset by peer'))
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/prefect/engine/task_runner.py", line 863, in get_task_run_state
logger=self.logger,
File "/usr/local/lib/python3.7/site-packages/prefect/utilities/executors.py", line 445, in run_task_with_timeout
return task.run(*args, **kwargs) # type: ignore
File "<string>", line 78, in ingest_task
RuntimeError: General runtime error during API call.
Charles Hunt
11/02/2021, 11:10 AMAnna Geller
Charles Hunt
11/02/2021, 11:37 AMAnna Geller
Charles Hunt
11/02/2021, 1:04 PMAnna Geller
prefect agent local start
But you may want to set up a Supervisord to make it run as a background process:
• Docs: https://docs.prefect.io/orchestration/agents/local.html#flow-configuration
• Supervisor: https://docs.prefect.io/orchestration/agents/local.html#using-with-supervisorCharles Hunt
11/05/2021, 12:00 PMCharles Hunt
11/05/2021, 12:08 PMimport socket
from urllib3.connection import HTTPConnection
TCP_KEEPALIVE = 0x10
HTTPConnection.default_socket_options = HTTPConnection.default_socket_options + [
(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1),
(socket.IPPROTO_TCP, TCP_KEEPALIVE, 60),
]
Charles Hunt
11/05/2021, 12:09 PMCharles Hunt
11/05/2021, 12:09 PMAnna Geller
Charles Hunt
11/05/2021, 12:10 PMAnna Geller
apiendpointurl = '<http://10.10.10.10:5000/SimpleIngest>'
Charles Hunt
11/05/2021, 12:11 PMapiendpointurl = '<http://localhost:5000/SimpleIngest>'
Anna Geller
Charles Hunt
11/05/2021, 12:26 PMCharles Hunt
11/05/2021, 12:27 PMCharles Hunt
11/05/2021, 12:30 PMCharles Hunt
11/05/2021, 12:31 PMCharles Hunt
11/05/2021, 12:31 PMCharles Hunt
11/05/2021, 12:31 PMAnna Geller