https://prefect.io logo
Title
p

Patrick Tan

11/29/2022, 4:59 PM
I am testing rerunning failed flow (Prefect 2.0) and get this message: prefect.exceptions.MissingResult: State data is missing. Typically, this occurs when result persistence is disabled and the state has been retrieved from the API.
โœ… 1
k

Khuyen Tran

11/29/2022, 4:59 PM
Can you show an example of your code?
p

Patrick Tan

11/29/2022, 5:00 PM
"""
Module with main function
"""


import sys

from lib.aws import get_os_aws_auth
from lib.greatexpectations import validate_os_items, validate_spider_data
from lib.imageproxy import fetch_image, validate_image
from lib.jira import create_jira_ticket
from lib.opensearch import (
    delete_opensearch_items,
    read_from_opensearch,
    skip_existing_opensearch_docs,
    to_opensearch,
)
from lib.s3 import read_file_from_s3
from lib.sightengine import moderate_text_dummy
from lib.util import (
    collect_slack_notifications,
    get_config,
    send_slack_notification,
    update_flow_status,
)
from prefect import context, flow, task
from tasks.process import drop_temp_attributes, generate_pkey, keep_livelots_attributes


@flow()
def livelots_etl(
    bucket: str = "wp-qa-livelots",
    prefix: str = "12345/20990502_220131_12345-DS=436.csv",
    config_file: str = "{'aws_profile':'no-mfa','bucket':'wp-livelots-config-sandbox','key':'config/config-dev-test.yaml'}",
    #     config_file: str = "/Users/patricktan/bitbucket/data/live-lots-etl/live_lots_etl_batch/config-dev-test.yaml",
):

    # get flow run info
    ctx = context.get_run_context()
    flow_run_id = str(ctx.flow_run.id)
    flow_run_name = ctx.flow_run.name

    # get api key, workspace
    ctx = context.get_settings_context()
    api_url = ctx.settings.PREFECT_API_URL
    words = api_url.split("/")
    api_key = words[5]
    workspace = words[7]

    # assemble flow run url for direct access
    flow_run_url = (
        "<https://app.prefect.cloud/account/>"
        + api_key
        + "/workspace/"
        + workspace
        + "/flow-runs/flow-run/"
        + flow_run_id
    )
    print(flow_run_url)

    try:

        get_config_task = task(get_config)
        read_file_from_s3_task = task(read_file_from_s3)
        delete_opensearch_items_task = task(delete_opensearch_items)
        skip_existing_opensearch_docs_task = task(skip_existing_opensearch_docs)
        generate_pkey_task = task(generate_pkey)
        to_opensearch_task = task(to_opensearch)
        get_os_aws_auth_task = task(get_os_aws_auth)
        validate_os_items_task = task(validate_os_items)
        fetch_image_task = task(fetch_image)
        moderate_text_dummy_task = task(
            moderate_text_dummy, retries=3, retry_delay_seconds=5
        )
        validate_image_task = task(validate_image, retries=3, retry_delay_seconds=5)
        keep_livelots_attributes_task = task(keep_livelots_attributes)
        validate_spider_data_task = task(validate_spider_data)
        drop_temp_attributes_task = task(drop_temp_attributes)
        collect_slack_notifications_task = task(collect_slack_notifications)
        send_slack_notification_task = task(send_slack_notification)
        read_from_opensearch_task = task(read_from_opensearch)

        runtime_params = get_config_task(config_file, bucket, prefix)

        data_for_slack = collect_slack_notifications_task(
            message="Flow Run Status", obj="Success"
        )
        data_for_slack = collect_slack_notifications_task(
            data_for_slack, "Flow Name", obj="livelots-etl"
        )
        data_for_slack = collect_slack_notifications_task(
            data_for_slack, "Flow Run Name", obj=flow_run_name
        )
        data_for_slack = collect_slack_notifications_task(
            data_for_slack, "Flow Run URL", obj=flow_run_url
        )

        data_for_slack = collect_slack_notifications_task(
            data_for_slack,
            "Current datafile parameters",
            obj={"config_file": config_file, "bucket": bucket, "prefix": prefix},
        )

        aws_auth = get_os_aws_auth_task(runtime_params)

        df = read_file_from_s3_task(runtime_params)
        data_for_slack = collect_slack_notifications_task(
            data_for_slack, "Lines read from spider file", df
        )

        df = validate_spider_data_task(df, runtime_params)
        data_for_slack = collect_slack_notifications_task(
            data_for_slack, "Passed validation of spider file", df
        )

        attr_df = keep_livelots_attributes_task(runtime_params, df)
        data_for_slack = collect_slack_notifications_task(
            data_for_slack, "After cleaning attributes", attr_df
        )

        pkey_df = generate_pkey_task(attr_df, runtime_params)
        data_for_slack = collect_slack_notifications_task(
            data_for_slack, "After creating keys", pkey_df
        )

        # for test purporses, delete dup OS items if flag force_delete_duplicates_os_items in config == 'yes'
        del_resp = delete_opensearch_items_task(pkey_df, runtime_params, aws_auth)

        deduped_df = skip_existing_opensearch_docs_task(
            pkey_df, runtime_params, aws_auth, wait_for=[del_resp]
        )

        data_for_slack = collect_slack_notifications_task(
            data_for_slack, "Lines after deduplication", deduped_df
        )

        text_moderated_df = moderate_text_dummy_task(deduped_df, runtime_params)
        data_for_slack = collect_slack_notifications_task(
            data_for_slack, "After text moderation", text_moderated_df
        )

        fetch_image_df = fetch_image_task(text_moderated_df, runtime_params)
        data_for_slack = collect_slack_notifications_task(
            data_for_slack, "After fetching images", fetch_image_df
        )

        validate_image_df = validate_image_task(fetch_image_df, runtime_params)
        data_for_slack = collect_slack_notifications_task(
            data_for_slack, "After image validation", validate_image_df
        )

        clean_df = drop_temp_attributes_task(validate_image_df, runtime_params)
        data_for_slack = collect_slack_notifications_task(
            data_for_slack, "After cleaning temp attrs", clean_df
        )

        injected_list = to_opensearch_task(clean_df, runtime_params, aws_auth)
        data_for_slack = collect_slack_notifications_task(
            data_for_slack, "Injected to OS", injected_list
        )

        injected_df = read_from_opensearch_task(injected_list, runtime_params, aws_auth)
        data_for_slack = collect_slack_notifications_task(
            data_for_slack, "Read back from OS for check", injected_df
        )

        validated_os_df = validate_os_items_task(injected_df, runtime_params)
        data_for_slack = collect_slack_notifications_task(
            data_for_slack, "Passed validation of OpenSearch items", validated_os_df
        )

        send_slack_notification_task(data_for_slack, runtime_params)

    except Exception:
        runtime_params = get_config_task(config_file, bucket, prefix)
        update_flow_status(runtime_params, "Failed")
        data_for_slack = collect_slack_notifications_task(
            message="Flow Run Status", obj="Fail"
        )
        data_for_slack = collect_slack_notifications_task(
            data_for_slack, "Flow Name", obj="livelots-etl"
        )
        data_for_slack = collect_slack_notifications_task(
            data_for_slack, "Flow Run Name", obj=flow_run_name
        )
        data_for_slack = collect_slack_notifications_task(
            data_for_slack, "Flow Run URL", obj=flow_run_url
        )

        data_for_slack = collect_slack_notifications_task(
            data_for_slack,
            "Current datafile parameters",
            obj={
                "config_file": runtime_params["config_file"],
                "bucket": runtime_params["bucket"],
                "prefix": runtime_params["prefix"],
            },
        )
        if runtime_params["jira_creation"] == "yes":
            jira_title = f"{runtime_params['environment']}: Live Lots ETL failed"
            jira_content = f"config_file: {runtime_params['config_file']}\nbucket: {runtime_params['bucket']}\nfilename: {runtime_params['prefix']}\nPrefect Flow URL: {flow_run_url}"
            jira_response = create_jira_ticket(runtime_params, jira_title, jira_content)
            if jira_response["status_code"] == 201:
                data_for_slack = collect_slack_notifications_task(
                    data_for_slack, "Jira URL", obj=jira_response["issue_url"]
                )

        send_slack_notification_task(data_for_slack, runtime_params, fail=True)

        raise

    return 0


if __name__ == "__main__":
    par_config_file = sys.argv[1]
    par_bucket = sys.argv[2]
    par_prefix = sys.argv[3]
    livelots_etl(bucket=par_bucket, prefix=par_prefix, config_file=par_config_file)
k

Khuyen Tran

11/29/2022, 5:09 PM
Which version of Prefect are you in? Are you using
task(your_function)
to turn your function into a task?
p

Patrick Tan

11/29/2022, 5:10 PM
prefect 2.6.7, yes I m using task() function
In prefect 1.0, I always able to retry failed flow
z

Zanie

11/29/2022, 5:13 PM
If youโ€™re going to rerun a failed flow, you need to turn on result persistence
๐Ÿ™Œ 1
p

Patrick Tan

11/29/2022, 5:14 PM
Thanks
I am able to successfully rerun failed flow after adding persist_result=True to the flow and all it's tasks. My next question is regarding result_storage, I tested and able to persist result in S3 bucket. My question is: How can I make S3 bucket as variable and specify during runtime or deployment. I want to different S3 bucket for different environments (dev/qa/prod)
z

Zanie

11/29/2022, 9:02 PM
You can use
S3.load(os.environ.get("MY_ENV", "dev"))
where you specify the storage, then you can create a S3 block for dev/prod and set the MY_ENV environment variable accordingly
๐Ÿ™Œ 1
:gratitude-thank-you: 1
cc @Anna Geller might have an example of this
a

Anna Geller

11/30/2022, 1:35 PM
I want to different S3 bucket for different environments (dev/qa/prod)
we generally recommend environment parity across workspaces, so the same bucket name s3/default could point to bucket A in dev workspaces and to bucket B in prod workspace, but if you are on OSS, you can use env variables as Michael recommended specifically for results you can find some examples here https://github.com/anna-geller/prefect-deployment-patterns/tree/main/results
:thank-you: 1
p

Patrick Tan

11/30/2022, 2:36 PM
Thanks, l'll look into it
๐Ÿ™Œ 1