I am trying to connect elasticsearch clietn but ge...
# prefect-community
s
I am trying to connect elasticsearch clietn but getting exception
Copy code
Traceback (most recent call last):
  File "/Users/sabirali/PycharmProjects/ETL/ElasticSearchClientTest.py", line 5, in <module>
    print(<http://client.info|client.info>())
  File "/Users/sabirali/PycharmProjects/ETL/venv/lib/python3.8/site-packages/elasticsearch/_sync/client/utils.py", line 414, in wrapped
    return api(*args, **kwargs)
  File "/Users/sabirali/PycharmProjects/ETL/venv/lib/python3.8/site-packages/elasticsearch/_sync/client/__init__.py", line 2277, in info
    return self.perform_request(  # type: ignore[return-value]
  File "/Users/sabirali/PycharmProjects/ETL/venv/lib/python3.8/site-packages/elasticsearch/_sync/client/_base.py", line 332, in perform_request
    raise UnsupportedProductError(
elasticsearch.UnsupportedProductError: The client noticed that the server is not Elasticsearch and we do not support this unknown product
I used following command to install elasticsearch client
Copy code
(venv) sabirali@Sabirs-MacBook-Pro ETL % pip install elasticsearch      
Collecting elasticsearch
  Using cached elasticsearch-8.3.1-py3-none-any.whl (382 kB)
Requirement already satisfied: elastic-transport<9,>=8 in ./venv/lib/python3.8/site-packages (from elasticsearch) (8.1.2)
Requirement already satisfied: urllib3<2,>=1.26.2 in ./venv/lib/python3.8/site-packages (from elastic-transport<9,>=8->elasticsearch) (1.26.9)
Requirement already satisfied: certifi in ./venv/lib/python3.8/site-packages (from elastic-transport<9,>=8->elasticsearch) (2022.6.15)
Installing collected packages: elasticsearch
Successfully installed elasticsearch-8.3.1
WARNING: You are using pip version 21.3.1; however, version 22.1.2 is available.
You should consider upgrading via the '/Users/sabirali/PycharmProjects/ETL/venv/bin/python -m pip install --upgrade pip' command.
(venv) sabirali@Sabirs-MacBook-Pro ETL %
1
a
can you move the code blocks to the thread?
your past message:
Copy code
{
  "name": "seamless-unified.novalocal",
  "cluster_name": "elasticsearch",
  "cluster_uuid": "KGa8W06wQmieog17sV9pyg",
  "version": {
    "number": "7.8.0",
    "build_flavor": "default",
    "build_type": "tar",
    "build_hash": "757314695644ea9a1dc2fecd26d1a43856725e65",
    "build_date": "2020-06-14T19:35:50.234439Z",
    "build_snapshot": false,
    "lucene_version": "8.5.1",
    "minimum_wire_compatibility_version": "6.8.0",
    "minimum_index_compatibility_version": "6.0.0-beta1"
  },
  "tagline": "You Know, for Search"
}
how is this related to Prefect?
s
I am writing ETL pipeline using prefect
It is fixed by installing the following version
pip install elasticsearch==7.6.0
Copy code
import prefect
from prefect import task, Flow
from datetime import datetime
from elasticsearch import Elasticsearch

client = Elasticsearch(["<http://10.91.9.220:9200>"])

logger = prefect.context.get("logger")


class GreyModel:
    def __init__(self, msisdn="", status="", greyDate= ""):
        self.msisdn = msisdn
        self.status = status
        self.greyDate = greyDate

@task
def extract(path):

    rows = []

    with open(path, "r") as greyListFile:
        for line in greyListFile:
            data = line.strip().split(',')
            rows.append(GreyModel(data[0], data[1], data[2]))
    greyListFile.close()
    return rows


@task
def transform(data):
    for line in data:
        <http://logger.info|logger.info>("MSISDN "+line.msisdn)
    return data


@task
def load(rows, path):
    for line in rows:
        doc = {
            'date': line.greyDate,
            'grey_status': line.status,
            'timestamp': datetime.now(),
        }
        res = client.index(index="grey_list", id=line.msisdn, body=doc)
        print(res['result'])
    print("Number of rows indexed: "+len(rows))
    return


with Flow("greylist_etl") as flow:
    filePath = "Grey_Feb_04022022.csv"
    data = extract(filePath)
    rows = transform(data)
    load(rows, filePath)


flow.run()
aforementioned issue is fixed but data loading into elasticsearch is slow
a
I don't know enough about your setup to give any immediate recommendations. how would you troubleshoot this just in Python without Prefect?
s
I am using prefect
reading the data from file and then ingesting it into elastic-search using prefect flow task
I have shared the flow above
Copy code
def load(rows, path):
check this task
a
the only issue which is wrong in your setup is that you should add this line to every task that needs logger separately, rather than defining it globally. Instead of global:
Copy code
logger = prefect.context.get("logger")
you need:
Copy code
@task
def transform(data):
    logger = prefect.context.get("logger")
    for line in data:
        <http://logger.info|logger.info>("MSISDN "+line.msisdn)
    return data
1