Sabir Ali
07/13/2022, 5:21 PMTraceback (most recent call last):
File "/Users/sabirali/PycharmProjects/ETL/ElasticSearchClientTest.py", line 5, in <module>
print(<http://client.info|client.info>())
File "/Users/sabirali/PycharmProjects/ETL/venv/lib/python3.8/site-packages/elasticsearch/_sync/client/utils.py", line 414, in wrapped
return api(*args, **kwargs)
File "/Users/sabirali/PycharmProjects/ETL/venv/lib/python3.8/site-packages/elasticsearch/_sync/client/__init__.py", line 2277, in info
return self.perform_request( # type: ignore[return-value]
File "/Users/sabirali/PycharmProjects/ETL/venv/lib/python3.8/site-packages/elasticsearch/_sync/client/_base.py", line 332, in perform_request
raise UnsupportedProductError(
elasticsearch.UnsupportedProductError: The client noticed that the server is not Elasticsearch and we do not support this unknown product
I used following command to install elasticsearch client
(venv) sabirali@Sabirs-MacBook-Pro ETL % pip install elasticsearch
Collecting elasticsearch
Using cached elasticsearch-8.3.1-py3-none-any.whl (382 kB)
Requirement already satisfied: elastic-transport<9,>=8 in ./venv/lib/python3.8/site-packages (from elasticsearch) (8.1.2)
Requirement already satisfied: urllib3<2,>=1.26.2 in ./venv/lib/python3.8/site-packages (from elastic-transport<9,>=8->elasticsearch) (1.26.9)
Requirement already satisfied: certifi in ./venv/lib/python3.8/site-packages (from elastic-transport<9,>=8->elasticsearch) (2022.6.15)
Installing collected packages: elasticsearch
Successfully installed elasticsearch-8.3.1
WARNING: You are using pip version 21.3.1; however, version 22.1.2 is available.
You should consider upgrading via the '/Users/sabirali/PycharmProjects/ETL/venv/bin/python -m pip install --upgrade pip' command.
(venv) sabirali@Sabirs-MacBook-Pro ETL %
Anna Geller
07/13/2022, 6:11 PM{
"name": "seamless-unified.novalocal",
"cluster_name": "elasticsearch",
"cluster_uuid": "KGa8W06wQmieog17sV9pyg",
"version": {
"number": "7.8.0",
"build_flavor": "default",
"build_type": "tar",
"build_hash": "757314695644ea9a1dc2fecd26d1a43856725e65",
"build_date": "2020-06-14T19:35:50.234439Z",
"build_snapshot": false,
"lucene_version": "8.5.1",
"minimum_wire_compatibility_version": "6.8.0",
"minimum_index_compatibility_version": "6.0.0-beta1"
},
"tagline": "You Know, for Search"
}
Sabir Ali
07/13/2022, 7:44 PMpip install elasticsearch==7.6.0
import prefect
from prefect import task, Flow
from datetime import datetime
from elasticsearch import Elasticsearch
client = Elasticsearch(["<http://10.91.9.220:9200>"])
logger = prefect.context.get("logger")
class GreyModel:
def __init__(self, msisdn="", status="", greyDate= ""):
self.msisdn = msisdn
self.status = status
self.greyDate = greyDate
@task
def extract(path):
rows = []
with open(path, "r") as greyListFile:
for line in greyListFile:
data = line.strip().split(',')
rows.append(GreyModel(data[0], data[1], data[2]))
greyListFile.close()
return rows
@task
def transform(data):
for line in data:
<http://logger.info|logger.info>("MSISDN "+line.msisdn)
return data
@task
def load(rows, path):
for line in rows:
doc = {
'date': line.greyDate,
'grey_status': line.status,
'timestamp': datetime.now(),
}
res = client.index(index="grey_list", id=line.msisdn, body=doc)
print(res['result'])
print("Number of rows indexed: "+len(rows))
return
with Flow("greylist_etl") as flow:
filePath = "Grey_Feb_04022022.csv"
data = extract(filePath)
rows = transform(data)
load(rows, filePath)
flow.run()
Anna Geller
07/13/2022, 9:00 PMSabir Ali
07/14/2022, 8:36 AMdef load(rows, path):
check this taskAnna Geller
07/14/2022, 11:29 AMlogger = prefect.context.get("logger")
you need:
@task
def transform(data):
logger = prefect.context.get("logger")
for line in data:
<http://logger.info|logger.info>("MSISDN "+line.msisdn)
return data