Patrick Tan
11/29/2022, 4:59 PMKhuyen Tran
11/29/2022, 4:59 PMPatrick Tan
11/29/2022, 5:00 PM"""
Module with main function
"""
import sys
from lib.aws import get_os_aws_auth
from lib.greatexpectations import validate_os_items, validate_spider_data
from lib.imageproxy import fetch_image, validate_image
from lib.jira import create_jira_ticket
from lib.opensearch import (
delete_opensearch_items,
read_from_opensearch,
skip_existing_opensearch_docs,
to_opensearch,
)
from lib.s3 import read_file_from_s3
from lib.sightengine import moderate_text_dummy
from lib.util import (
collect_slack_notifications,
get_config,
send_slack_notification,
update_flow_status,
)
from prefect import context, flow, task
from tasks.process import drop_temp_attributes, generate_pkey, keep_livelots_attributes
@flow()
def livelots_etl(
bucket: str = "wp-qa-livelots",
prefix: str = "12345/20990502_220131_12345-DS=436.csv",
config_file: str = "{'aws_profile':'no-mfa','bucket':'wp-livelots-config-sandbox','key':'config/config-dev-test.yaml'}",
# config_file: str = "/Users/patricktan/bitbucket/data/live-lots-etl/live_lots_etl_batch/config-dev-test.yaml",
):
# get flow run info
ctx = context.get_run_context()
flow_run_id = str(ctx.flow_run.id)
flow_run_name = ctx.flow_run.name
# get api key, workspace
ctx = context.get_settings_context()
api_url = ctx.settings.PREFECT_API_URL
words = api_url.split("/")
api_key = words[5]
workspace = words[7]
# assemble flow run url for direct access
flow_run_url = (
"<https://app.prefect.cloud/account/>"
+ api_key
+ "/workspace/"
+ workspace
+ "/flow-runs/flow-run/"
+ flow_run_id
)
print(flow_run_url)
try:
get_config_task = task(get_config)
read_file_from_s3_task = task(read_file_from_s3)
delete_opensearch_items_task = task(delete_opensearch_items)
skip_existing_opensearch_docs_task = task(skip_existing_opensearch_docs)
generate_pkey_task = task(generate_pkey)
to_opensearch_task = task(to_opensearch)
get_os_aws_auth_task = task(get_os_aws_auth)
validate_os_items_task = task(validate_os_items)
fetch_image_task = task(fetch_image)
moderate_text_dummy_task = task(
moderate_text_dummy, retries=3, retry_delay_seconds=5
)
validate_image_task = task(validate_image, retries=3, retry_delay_seconds=5)
keep_livelots_attributes_task = task(keep_livelots_attributes)
validate_spider_data_task = task(validate_spider_data)
drop_temp_attributes_task = task(drop_temp_attributes)
collect_slack_notifications_task = task(collect_slack_notifications)
send_slack_notification_task = task(send_slack_notification)
read_from_opensearch_task = task(read_from_opensearch)
runtime_params = get_config_task(config_file, bucket, prefix)
data_for_slack = collect_slack_notifications_task(
message="Flow Run Status", obj="Success"
)
data_for_slack = collect_slack_notifications_task(
data_for_slack, "Flow Name", obj="livelots-etl"
)
data_for_slack = collect_slack_notifications_task(
data_for_slack, "Flow Run Name", obj=flow_run_name
)
data_for_slack = collect_slack_notifications_task(
data_for_slack, "Flow Run URL", obj=flow_run_url
)
data_for_slack = collect_slack_notifications_task(
data_for_slack,
"Current datafile parameters",
obj={"config_file": config_file, "bucket": bucket, "prefix": prefix},
)
aws_auth = get_os_aws_auth_task(runtime_params)
df = read_file_from_s3_task(runtime_params)
data_for_slack = collect_slack_notifications_task(
data_for_slack, "Lines read from spider file", df
)
df = validate_spider_data_task(df, runtime_params)
data_for_slack = collect_slack_notifications_task(
data_for_slack, "Passed validation of spider file", df
)
attr_df = keep_livelots_attributes_task(runtime_params, df)
data_for_slack = collect_slack_notifications_task(
data_for_slack, "After cleaning attributes", attr_df
)
pkey_df = generate_pkey_task(attr_df, runtime_params)
data_for_slack = collect_slack_notifications_task(
data_for_slack, "After creating keys", pkey_df
)
# for test purporses, delete dup OS items if flag force_delete_duplicates_os_items in config == 'yes'
del_resp = delete_opensearch_items_task(pkey_df, runtime_params, aws_auth)
deduped_df = skip_existing_opensearch_docs_task(
pkey_df, runtime_params, aws_auth, wait_for=[del_resp]
)
data_for_slack = collect_slack_notifications_task(
data_for_slack, "Lines after deduplication", deduped_df
)
text_moderated_df = moderate_text_dummy_task(deduped_df, runtime_params)
data_for_slack = collect_slack_notifications_task(
data_for_slack, "After text moderation", text_moderated_df
)
fetch_image_df = fetch_image_task(text_moderated_df, runtime_params)
data_for_slack = collect_slack_notifications_task(
data_for_slack, "After fetching images", fetch_image_df
)
validate_image_df = validate_image_task(fetch_image_df, runtime_params)
data_for_slack = collect_slack_notifications_task(
data_for_slack, "After image validation", validate_image_df
)
clean_df = drop_temp_attributes_task(validate_image_df, runtime_params)
data_for_slack = collect_slack_notifications_task(
data_for_slack, "After cleaning temp attrs", clean_df
)
injected_list = to_opensearch_task(clean_df, runtime_params, aws_auth)
data_for_slack = collect_slack_notifications_task(
data_for_slack, "Injected to OS", injected_list
)
injected_df = read_from_opensearch_task(injected_list, runtime_params, aws_auth)
data_for_slack = collect_slack_notifications_task(
data_for_slack, "Read back from OS for check", injected_df
)
validated_os_df = validate_os_items_task(injected_df, runtime_params)
data_for_slack = collect_slack_notifications_task(
data_for_slack, "Passed validation of OpenSearch items", validated_os_df
)
send_slack_notification_task(data_for_slack, runtime_params)
except Exception:
runtime_params = get_config_task(config_file, bucket, prefix)
update_flow_status(runtime_params, "Failed")
data_for_slack = collect_slack_notifications_task(
message="Flow Run Status", obj="Fail"
)
data_for_slack = collect_slack_notifications_task(
data_for_slack, "Flow Name", obj="livelots-etl"
)
data_for_slack = collect_slack_notifications_task(
data_for_slack, "Flow Run Name", obj=flow_run_name
)
data_for_slack = collect_slack_notifications_task(
data_for_slack, "Flow Run URL", obj=flow_run_url
)
data_for_slack = collect_slack_notifications_task(
data_for_slack,
"Current datafile parameters",
obj={
"config_file": runtime_params["config_file"],
"bucket": runtime_params["bucket"],
"prefix": runtime_params["prefix"],
},
)
if runtime_params["jira_creation"] == "yes":
jira_title = f"{runtime_params['environment']}: Live Lots ETL failed"
jira_content = f"config_file: {runtime_params['config_file']}\nbucket: {runtime_params['bucket']}\nfilename: {runtime_params['prefix']}\nPrefect Flow URL: {flow_run_url}"
jira_response = create_jira_ticket(runtime_params, jira_title, jira_content)
if jira_response["status_code"] == 201:
data_for_slack = collect_slack_notifications_task(
data_for_slack, "Jira URL", obj=jira_response["issue_url"]
)
send_slack_notification_task(data_for_slack, runtime_params, fail=True)
raise
return 0
if __name__ == "__main__":
par_config_file = sys.argv[1]
par_bucket = sys.argv[2]
par_prefix = sys.argv[3]
livelots_etl(bucket=par_bucket, prefix=par_prefix, config_file=par_config_file)
Khuyen Tran
11/29/2022, 5:09 PMtask(your_function)
to turn your function into a task?Patrick Tan
11/29/2022, 5:10 PMZanie
11/29/2022, 5:13 PMPatrick Tan
11/29/2022, 5:14 PMZanie
11/29/2022, 9:02 PMS3.load(os.environ.get("MY_ENV", "dev"))
where you specify the storage, then you can create a S3 block for dev/prod and set the MY_ENV environment variable accordinglyAnna Geller
11/30/2022, 1:35 PMI want to different S3 bucket for different environments (dev/qa/prod)we generally recommend environment parity across workspaces, so the same bucket name s3/default could point to bucket A in dev workspaces and to bucket B in prod workspace, but if you are on OSS, you can use env variables as Michael recommended specifically for results you can find some examples here https://github.com/anna-geller/prefect-deployment-patterns/tree/main/results
Patrick Tan
11/30/2022, 2:36 PM