Patrick Tan
11/29/2022, 4:59 PMKhuyen Tran
11/29/2022, 4:59 PMPatrick Tan
11/29/2022, 5:00 PM"""
Module with main function
"""
import sys
from lib.aws import get_os_aws_auth
from lib.greatexpectations import validate_os_items, validate_spider_data
from lib.imageproxy import fetch_image, validate_image
from lib.jira import create_jira_ticket
from lib.opensearch import (
    delete_opensearch_items,
    read_from_opensearch,
    skip_existing_opensearch_docs,
    to_opensearch,
)
from lib.s3 import read_file_from_s3
from lib.sightengine import moderate_text_dummy
from lib.util import (
    collect_slack_notifications,
    get_config,
    send_slack_notification,
    update_flow_status,
)
from prefect import context, flow, task
from tasks.process import drop_temp_attributes, generate_pkey, keep_livelots_attributes
@flow()
def livelots_etl(
    bucket: str = "wp-qa-livelots",
    prefix: str = "12345/20990502_220131_12345-DS=436.csv",
    config_file: str = "{'aws_profile':'no-mfa','bucket':'wp-livelots-config-sandbox','key':'config/config-dev-test.yaml'}",
    #     config_file: str = "/Users/patricktan/bitbucket/data/live-lots-etl/live_lots_etl_batch/config-dev-test.yaml",
):
    # get flow run info
    ctx = context.get_run_context()
    flow_run_id = str(ctx.flow_run.id)
    flow_run_name = ctx.flow_run.name
    # get api key, workspace
    ctx = context.get_settings_context()
    api_url = ctx.settings.PREFECT_API_URL
    words = api_url.split("/")
    api_key = words[5]
    workspace = words[7]
    # assemble flow run url for direct access
    flow_run_url = (
        "<https://app.prefect.cloud/account/>"
        + api_key
        + "/workspace/"
        + workspace
        + "/flow-runs/flow-run/"
        + flow_run_id
    )
    print(flow_run_url)
    try:
        get_config_task = task(get_config)
        read_file_from_s3_task = task(read_file_from_s3)
        delete_opensearch_items_task = task(delete_opensearch_items)
        skip_existing_opensearch_docs_task = task(skip_existing_opensearch_docs)
        generate_pkey_task = task(generate_pkey)
        to_opensearch_task = task(to_opensearch)
        get_os_aws_auth_task = task(get_os_aws_auth)
        validate_os_items_task = task(validate_os_items)
        fetch_image_task = task(fetch_image)
        moderate_text_dummy_task = task(
            moderate_text_dummy, retries=3, retry_delay_seconds=5
        )
        validate_image_task = task(validate_image, retries=3, retry_delay_seconds=5)
        keep_livelots_attributes_task = task(keep_livelots_attributes)
        validate_spider_data_task = task(validate_spider_data)
        drop_temp_attributes_task = task(drop_temp_attributes)
        collect_slack_notifications_task = task(collect_slack_notifications)
        send_slack_notification_task = task(send_slack_notification)
        read_from_opensearch_task = task(read_from_opensearch)
        runtime_params = get_config_task(config_file, bucket, prefix)
        data_for_slack = collect_slack_notifications_task(
            message="Flow Run Status", obj="Success"
        )
        data_for_slack = collect_slack_notifications_task(
            data_for_slack, "Flow Name", obj="livelots-etl"
        )
        data_for_slack = collect_slack_notifications_task(
            data_for_slack, "Flow Run Name", obj=flow_run_name
        )
        data_for_slack = collect_slack_notifications_task(
            data_for_slack, "Flow Run URL", obj=flow_run_url
        )
        data_for_slack = collect_slack_notifications_task(
            data_for_slack,
            "Current datafile parameters",
            obj={"config_file": config_file, "bucket": bucket, "prefix": prefix},
        )
        aws_auth = get_os_aws_auth_task(runtime_params)
        df = read_file_from_s3_task(runtime_params)
        data_for_slack = collect_slack_notifications_task(
            data_for_slack, "Lines read from spider file", df
        )
        df = validate_spider_data_task(df, runtime_params)
        data_for_slack = collect_slack_notifications_task(
            data_for_slack, "Passed validation of spider file", df
        )
        attr_df = keep_livelots_attributes_task(runtime_params, df)
        data_for_slack = collect_slack_notifications_task(
            data_for_slack, "After cleaning attributes", attr_df
        )
        pkey_df = generate_pkey_task(attr_df, runtime_params)
        data_for_slack = collect_slack_notifications_task(
            data_for_slack, "After creating keys", pkey_df
        )
        # for test purporses, delete dup OS items if flag force_delete_duplicates_os_items in config == 'yes'
        del_resp = delete_opensearch_items_task(pkey_df, runtime_params, aws_auth)
        deduped_df = skip_existing_opensearch_docs_task(
            pkey_df, runtime_params, aws_auth, wait_for=[del_resp]
        )
        data_for_slack = collect_slack_notifications_task(
            data_for_slack, "Lines after deduplication", deduped_df
        )
        text_moderated_df = moderate_text_dummy_task(deduped_df, runtime_params)
        data_for_slack = collect_slack_notifications_task(
            data_for_slack, "After text moderation", text_moderated_df
        )
        fetch_image_df = fetch_image_task(text_moderated_df, runtime_params)
        data_for_slack = collect_slack_notifications_task(
            data_for_slack, "After fetching images", fetch_image_df
        )
        validate_image_df = validate_image_task(fetch_image_df, runtime_params)
        data_for_slack = collect_slack_notifications_task(
            data_for_slack, "After image validation", validate_image_df
        )
        clean_df = drop_temp_attributes_task(validate_image_df, runtime_params)
        data_for_slack = collect_slack_notifications_task(
            data_for_slack, "After cleaning temp attrs", clean_df
        )
        injected_list = to_opensearch_task(clean_df, runtime_params, aws_auth)
        data_for_slack = collect_slack_notifications_task(
            data_for_slack, "Injected to OS", injected_list
        )
        injected_df = read_from_opensearch_task(injected_list, runtime_params, aws_auth)
        data_for_slack = collect_slack_notifications_task(
            data_for_slack, "Read back from OS for check", injected_df
        )
        validated_os_df = validate_os_items_task(injected_df, runtime_params)
        data_for_slack = collect_slack_notifications_task(
            data_for_slack, "Passed validation of OpenSearch items", validated_os_df
        )
        send_slack_notification_task(data_for_slack, runtime_params)
    except Exception:
        runtime_params = get_config_task(config_file, bucket, prefix)
        update_flow_status(runtime_params, "Failed")
        data_for_slack = collect_slack_notifications_task(
            message="Flow Run Status", obj="Fail"
        )
        data_for_slack = collect_slack_notifications_task(
            data_for_slack, "Flow Name", obj="livelots-etl"
        )
        data_for_slack = collect_slack_notifications_task(
            data_for_slack, "Flow Run Name", obj=flow_run_name
        )
        data_for_slack = collect_slack_notifications_task(
            data_for_slack, "Flow Run URL", obj=flow_run_url
        )
        data_for_slack = collect_slack_notifications_task(
            data_for_slack,
            "Current datafile parameters",
            obj={
                "config_file": runtime_params["config_file"],
                "bucket": runtime_params["bucket"],
                "prefix": runtime_params["prefix"],
            },
        )
        if runtime_params["jira_creation"] == "yes":
            jira_title = f"{runtime_params['environment']}: Live Lots ETL failed"
            jira_content = f"config_file: {runtime_params['config_file']}\nbucket: {runtime_params['bucket']}\nfilename: {runtime_params['prefix']}\nPrefect Flow URL: {flow_run_url}"
            jira_response = create_jira_ticket(runtime_params, jira_title, jira_content)
            if jira_response["status_code"] == 201:
                data_for_slack = collect_slack_notifications_task(
                    data_for_slack, "Jira URL", obj=jira_response["issue_url"]
                )
        send_slack_notification_task(data_for_slack, runtime_params, fail=True)
        raise
    return 0
if __name__ == "__main__":
    par_config_file = sys.argv[1]
    par_bucket = sys.argv[2]
    par_prefix = sys.argv[3]
    livelots_etl(bucket=par_bucket, prefix=par_prefix, config_file=par_config_file)Khuyen Tran
11/29/2022, 5:09 PMtask(your_function)Patrick Tan
11/29/2022, 5:10 PMPatrick Tan
11/29/2022, 5:11 PMZanie
Zanie
Patrick Tan
11/29/2022, 5:14 PMPatrick Tan
11/29/2022, 8:50 PMZanie
S3.load(os.environ.get("MY_ENV", "dev"))Zanie
Anna Geller
I want to different S3 bucket for different environments (dev/qa/prod)we generally recommend environment parity across workspaces, so the same bucket name s3/default could point to bucket A in dev workspaces and to bucket B in prod workspace, but if you are on OSS, you can use env variables as Michael recommended specifically for results you can find some examples here https://github.com/anna-geller/prefect-deployment-patterns/tree/main/results
Patrick Tan
11/30/2022, 2:36 PM