https://prefect.io logo
Title
j

jcozar

08/05/2022, 7:42 AM
Hi everyone! I'm interested in open data catalog services. I'm trying open metadata and amundsen. I've seen that both only support Apache Airflow for ETLs orchestration. Do you know if Prefect is working with open metadata, amundsen, or other similar products? Thank you!
a

Amanda Wee

08/05/2022, 8:22 AM
Looking at Amundsen, the overview says: "Users could either load the data with a python script with the library or with an Airflow DAG importing the library." That sounds like such a Python script could be easily converted to a Prefect flow.
👍 1
j

jcozar

08/05/2022, 9:02 AM
Thanks for your response @Amanda Wee! I'm just exploring technologies, so I'm a newbie at Amundsen. I will try it!
g

George Coyne

08/05/2022, 2:48 PM
Using Prefect with Amundsen is super easy!
What version of Prefect are you on?
# Copyright Contributors to the Amundsen project.
# SPDX-License-Identifier: Apache-2.0

"""
This is a example script which demo how to load data into neo4j without using Airflow DAG.
"""

import logging
import os
import sys
import uuid
from prefect.client import Secret

from elasticsearch.client import Elasticsearch
from pyhocon import ConfigFactory
import prefect
from prefect.tasks.secrets import PrefectSecret
from prefect import task, Flow
from databuilder.extractor.neo4j_extractor import Neo4jExtractor
from databuilder.extractor.neo4j_search_data_extractor import Neo4jSearchDataExtractor
from databuilder.extractor.snowflake_metadata_extractor import (
    SnowflakeMetadataExtractor,
)
from databuilder.extractor.sql_alchemy_extractor import SQLAlchemyExtractor
from databuilder.job.job import DefaultJob
from databuilder.loader.file_system_elasticsearch_json_loader import (
    FSElasticsearchJSONLoader,
)
from databuilder.loader.file_system_neo4j_csv_loader import FsNeo4jCSVLoader
from databuilder.publisher import neo4j_csv_publisher
from databuilder.publisher.elasticsearch_publisher import ElasticsearchPublisher
from databuilder.publisher.neo4j_csv_publisher import Neo4jCsvPublisher
from databuilder.task.task import DefaultTask
from databuilder.transformer.base_transformer import NoopTransformer

LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(<http://logging.INFO|logging.INFO>)
# Disable snowflake logging
logging.getLogger("snowflake.connector.network").disabled = True

host = "<http://amundsen-neo4j.example.io|amundsen-neo4j.example.io>"

# set env NEO4J_HOST to override localhost
neo4j_endpoint = f"bolt://{host}:7687"
# neo4j_endpoint = "<http://amundsen.example.io/neo4j|amundsen.example.io/neo4j>"

neo4j_user = "neo4j"
neo4j_password = "test"

IGNORED_SCHEMAS = ["'DVCORE'", "'INFORMATION_SCHEMA'", "'STAGE_ORACLE'"]

es_host = "<http://amundsen-es.example.io|amundsen-es.example.io>"
# neo_host = None

es = Elasticsearch(
    [
        {"host": es_host if es_host else "amundsen-elasticsearch-client"},
    ]
)

# # todo: connection string needs to change
# # @task(name="Create Snowflake Connection String")
# def connection_string(database: str):
#     # Refer this doc: <https://docs.snowflake.com/en/user-guide/sqlalchemy.html#connection-parameters>
#     # for supported connection parameters and configurations


@task(name="Snowflake Metadata Job")
def snowflake_metadata_job(database: str):
    where_clause = f"WHERE c.TABLE_SCHEMA not in ({','.join(IGNORED_SCHEMAS)}) \
            AND c.TABLE_SCHEMA not like 'STAGE_%' \
            AND c.TABLE_SCHEMA not like 'HIST_%' \
            AND c.TABLE_SCHEMA not like 'SNAP_%' \
            AND c.TABLE_SCHEMA not like 'AWELCH%' \
            AND c.TABLE_SCHEMA not like '%DEV' \
            AND lower(c.COLUMN_NAME) not like 'dw_%';"

    tmp_folder = "/var/tmp/amundsen/tables"
    node_files_folder = f"{tmp_folder}/nodes"
    relationship_files_folder = f"{tmp_folder}/relationships"

    user = Secret("SNOWFLAKE_USER").get()
    password = Secret("SNOWFLAKE_PASSWORD").get()
    account = Secret("SNOWFLAKE_ACCOUNT").get()
    warehouse = "TRANSFORMER_XS"
    connection_string = (
        f"snowflake://{user}:{password}@{account}/{database}?warehouse={warehouse}"
    )

    sql_extractor = SnowflakeMetadataExtractor()
    csv_loader = FsNeo4jCSVLoader()

    task = DefaultTask(extractor=sql_extractor, loader=csv_loader)

    job_config = ConfigFactory.from_dict(
        {
            f"extractor.snowflake.extractor.sqlalchemy.{SQLAlchemyExtractor.CONN_STRING}": connection_string,
            f"extractor.snowflake.{SnowflakeMetadataExtractor.SNOWFLAKE_DATABASE_KEY}": database,
            f"extractor.snowflake.{SnowflakeMetadataExtractor.WHERE_CLAUSE_SUFFIX_KEY}": where_clause,
            f"loader.filesystem_csv_neo4j.{FsNeo4jCSVLoader.NODE_DIR_PATH}": node_files_folder,
            f"loader.filesystem_csv_neo4j.{FsNeo4jCSVLoader.RELATION_DIR_PATH}": relationship_files_folder,
            f"loader.filesystem_csv_neo4j.{FsNeo4jCSVLoader.SHOULD_DELETE_CREATED_DIR}": True,
            f"loader.filesystem_csv_neo4j.{FsNeo4jCSVLoader.FORCE_CREATE_DIR}": True,
            f"publisher.neo4j.{neo4j_csv_publisher.NODE_FILES_DIR}": node_files_folder,
            f"publisher.neo4j.{neo4j_csv_publisher.RELATION_FILES_DIR}": relationship_files_folder,
            f"publisher.neo4j.{neo4j_csv_publisher.NEO4J_END_POINT_KEY}": neo4j_endpoint,
            f"publisher.neo4j.{neo4j_csv_publisher.NEO4J_USER}": neo4j_user,
            f"publisher.neo4j.{neo4j_csv_publisher.NEO4J_PASSWORD}": neo4j_password,
            f"publisher.neo4j.{neo4j_csv_publisher.JOB_PUBLISH_TAG}": "unique_tag",
        }
    )
    job = DefaultJob(conf=job_config, task=task, publisher=Neo4jCsvPublisher())
    return job.launch()


@task
def es_publisher_job(
    elasticsearch_index_alias="table_search_index",
    elasticsearch_doc_type_key="table",
    model_name="databuilder.models.table_elasticsearch_document.TableESDocument",
    cypher_query=None,
    elasticsearch_mapping=None,
):
    """
    :param elasticsearch_index_alias:  alias for Elasticsearch used in
                                       amundsensearchlibrary/search_service/config.py as an index
    :param elasticsearch_doc_type_key: name the ElasticSearch index is prepended with. Defaults to `table` resulting in
                                       `table_search_index`
    :param model_name:                 the Databuilder model class used in transporting between Extractor and Loader
    :param cypher_query:               Query handed to the `Neo4jSearchDataExtractor` class, if None is given (default)
                                       it uses the `Table` query baked into the Extractor
    :param elasticsearch_mapping:      Elasticsearch field mapping "DDL" handed to the `ElasticsearchPublisher` class,
                                       if None is given (default) it uses the `Table` query baked into the Publisher
    """
    # loader saves data to this location and publisher reads it from here
    extracted_search_data_path = "/var/tmp/amundsen/search_data.json"

    task = DefaultTask(
        loader=FSElasticsearchJSONLoader(),
        extractor=Neo4jSearchDataExtractor(),
        transformer=NoopTransformer(),
    )

    # elastic search client instance
    elasticsearch_client = es
    # unique name of new index in Elasticsearch
    elasticsearch_new_index_key = "tables" + str(uuid.uuid4())

    job_config = ConfigFactory.from_dict(
        {
            f"extractor.search_data.extractor.neo4j.{Neo4jExtractor.GRAPH_URL_CONFIG_KEY}": neo4j_endpoint,
            f"extractor.search_data.extractor.neo4j.{Neo4jExtractor.MODEL_CLASS_CONFIG_KEY}": model_name,
            f"extractor.search_data.extractor.neo4j.{Neo4jExtractor.NEO4J_AUTH_USER}": neo4j_user,
            f"extractor.search_data.extractor.neo4j.{Neo4jExtractor.NEO4J_AUTH_PW}": neo4j_password,
            f"loader.filesystem.elasticsearch.{FSElasticsearchJSONLoader.FILE_PATH_CONFIG_KEY}": extracted_search_data_path,
            f"loader.filesystem.elasticsearch.{FSElasticsearchJSONLoader.FILE_MODE_CONFIG_KEY}": "w",
            f"publisher.elasticsearch.{ElasticsearchPublisher.FILE_PATH_CONFIG_KEY}": extracted_search_data_path,
            f"publisher.elasticsearch.{ElasticsearchPublisher.FILE_MODE_CONFIG_KEY}": "r",
            f"publisher.elasticsearch.{ElasticsearchPublisher.ELASTICSEARCH_CLIENT_CONFIG_KEY}": elasticsearch_client,
            f"publisher.elasticsearch.{ElasticsearchPublisher.ELASTICSEARCH_NEW_INDEX_CONFIG_KEY}": elasticsearch_new_index_key,
            f"publisher.elasticsearch.{ElasticsearchPublisher.ELASTICSEARCH_DOC_TYPE_CONFIG_KEY}": elasticsearch_doc_type_key,
            f"publisher.elasticsearch.{ElasticsearchPublisher.ELASTICSEARCH_ALIAS_CONFIG_KEY}": elasticsearch_index_alias,
        }
    )

    # only optionally add these keys, so need to dynamically `put` them
    if cypher_query:
        job_config.put(
            f"extractor.search_data.{Neo4jSearchDataExtractor.CYPHER_QUERY_CONFIG_KEY}",
            cypher_query,
        )
    if elasticsearch_mapping:
        job_config.put(
            f"publisher.elasticsearch.{ElasticsearchPublisher.ELASTICSEARCH_MAPPING_CONFIG_KEY}",
            elasticsearch_mapping,
        )

    job = DefaultJob(conf=job_config, task=task, publisher=ElasticsearchPublisher())
    return job.launch()


with Flow("Snowflake Metadata Extraction") as flow:
    databases = ["ANALYTICS"]
    for db in databases:
        snowflake_job = snowflake_metadata_job(db)

        es_publisher_job = es_publisher_job()


if __name__ == "__main__":
    flow.run()
Here is an example for prefect <2