Sabir Ali

    Sabir Ali

    2 months ago
    I am trying to connect elasticsearch clietn but getting exception
    Traceback (most recent call last):
      File "/Users/sabirali/PycharmProjects/ETL/ElasticSearchClientTest.py", line 5, in <module>
        print(<http://client.info|client.info>())
      File "/Users/sabirali/PycharmProjects/ETL/venv/lib/python3.8/site-packages/elasticsearch/_sync/client/utils.py", line 414, in wrapped
        return api(*args, **kwargs)
      File "/Users/sabirali/PycharmProjects/ETL/venv/lib/python3.8/site-packages/elasticsearch/_sync/client/__init__.py", line 2277, in info
        return self.perform_request(  # type: ignore[return-value]
      File "/Users/sabirali/PycharmProjects/ETL/venv/lib/python3.8/site-packages/elasticsearch/_sync/client/_base.py", line 332, in perform_request
        raise UnsupportedProductError(
    elasticsearch.UnsupportedProductError: The client noticed that the server is not Elasticsearch and we do not support this unknown product
    I used following command to install elasticsearch client
    (venv) sabirali@Sabirs-MacBook-Pro ETL % pip install elasticsearch      
    Collecting elasticsearch
      Using cached elasticsearch-8.3.1-py3-none-any.whl (382 kB)
    Requirement already satisfied: elastic-transport<9,>=8 in ./venv/lib/python3.8/site-packages (from elasticsearch) (8.1.2)
    Requirement already satisfied: urllib3<2,>=1.26.2 in ./venv/lib/python3.8/site-packages (from elastic-transport<9,>=8->elasticsearch) (1.26.9)
    Requirement already satisfied: certifi in ./venv/lib/python3.8/site-packages (from elastic-transport<9,>=8->elasticsearch) (2022.6.15)
    Installing collected packages: elasticsearch
    Successfully installed elasticsearch-8.3.1
    WARNING: You are using pip version 21.3.1; however, version 22.1.2 is available.
    You should consider upgrading via the '/Users/sabirali/PycharmProjects/ETL/venv/bin/python -m pip install --upgrade pip' command.
    (venv) sabirali@Sabirs-MacBook-Pro ETL %
    Anna Geller

    Anna Geller

    2 months ago
    can you move the code blocks to the thread?
    your past message:
    {
      "name": "seamless-unified.novalocal",
      "cluster_name": "elasticsearch",
      "cluster_uuid": "KGa8W06wQmieog17sV9pyg",
      "version": {
        "number": "7.8.0",
        "build_flavor": "default",
        "build_type": "tar",
        "build_hash": "757314695644ea9a1dc2fecd26d1a43856725e65",
        "build_date": "2020-06-14T19:35:50.234439Z",
        "build_snapshot": false,
        "lucene_version": "8.5.1",
        "minimum_wire_compatibility_version": "6.8.0",
        "minimum_index_compatibility_version": "6.0.0-beta1"
      },
      "tagline": "You Know, for Search"
    }
    how is this related to Prefect?
    Sabir Ali

    Sabir Ali

    2 months ago
    I am writing ETL pipeline using prefect
    It is fixed by installing the following version
    pip install elasticsearch==7.6.0
    import prefect
    from prefect import task, Flow
    from datetime import datetime
    from elasticsearch import Elasticsearch
    
    client = Elasticsearch(["<http://10.91.9.220:9200>"])
    
    logger = prefect.context.get("logger")
    
    
    class GreyModel:
        def __init__(self, msisdn="", status="", greyDate= ""):
            self.msisdn = msisdn
            self.status = status
            self.greyDate = greyDate
    
    @task
    def extract(path):
    
        rows = []
    
        with open(path, "r") as greyListFile:
            for line in greyListFile:
                data = line.strip().split(',')
                rows.append(GreyModel(data[0], data[1], data[2]))
        greyListFile.close()
        return rows
    
    
    @task
    def transform(data):
        for line in data:
            <http://logger.info|logger.info>("MSISDN "+line.msisdn)
        return data
    
    
    @task
    def load(rows, path):
        for line in rows:
            doc = {
                'date': line.greyDate,
                'grey_status': line.status,
                'timestamp': datetime.now(),
            }
            res = client.index(index="grey_list", id=line.msisdn, body=doc)
            print(res['result'])
        print("Number of rows indexed: "+len(rows))
        return
    
    
    with Flow("greylist_etl") as flow:
        filePath = "Grey_Feb_04022022.csv"
        data = extract(filePath)
        rows = transform(data)
        load(rows, filePath)
    
    
    flow.run()
    aforementioned issue is fixed but data loading into elasticsearch is slow
    Anna Geller

    Anna Geller

    2 months ago
    I don't know enough about your setup to give any immediate recommendations. how would you troubleshoot this just in Python without Prefect?
    Sabir Ali

    Sabir Ali

    2 months ago
    I am using prefect
    reading the data from file and then ingesting it into elastic-search using prefect flow task
    I have shared the flow above
    def load(rows, path):
    check this task
    Anna Geller

    Anna Geller

    2 months ago
    the only issue which is wrong in your setup is that you should add this line to every task that needs logger separately, rather than defining it globally. Instead of global:
    logger = prefect.context.get("logger")
    you need:
    @task
    def transform(data):
        logger = prefect.context.get("logger")
        for line in data:
            <http://logger.info|logger.info>("MSISDN "+line.msisdn)
        return data