Thread
#prefect-community
    Ken Nguyen

    Ken Nguyen

    5 months ago
    I currently have a flow that is set up like so:
    with Flow("data-quality-tracking-model-run-duration-flow", run_config=RUN_CONFIG, storage=STORAGE) as flow:
    
      dbt_run_flow_run_id = create_flow_run(
          flow_name="test-dbt-run-flow",
          project_name="data_quality_tracking"
      )
    
    
      flow_run = wait_for_flow_run(
          dbt_run_flow_run_id, raise_final_state=True, stream_logs=True
      )
    
    
      flow_logs = get_logs(
          dbt_run_flow_run_id,
          task_args={"name": "Getting logs", "trigger": all_finished},
          upstream_tasks=[flow_run],)
    get_logs
    is a function that takes in a
    flow_run_id
    , create a
    FlowRunView
    , then get the logs of that
    FlowRunView
    . My issue is that despite having both upstream_task defined AND a FlowRunView.get_latest() for the
    get_logs
    task, I’m still getting logs that are incomplete. Do you have any suggestions for why my
    get_logs
    function is prematurely retrieving logs of the child flow/retrieving incomplete logs?
    Not super helpful picture, but it’s meant to show that the final line of the logs retrieved is not the expected final line. When checking the child flow’s logs within the parent flow’s logs (because
    stream_logs=True
    ). I also see incomplete logs
    However when I go into the child flow logs itself I see it was complete
    Anna Geller

    Anna Geller

    5 months ago
    I think there is a much easier way to retrieve dbt logs from your run. Right now, you have:1. one flow that triggers dbt and returns its output, 2. it then prints the dbt output, 3. Prefect takes this printed output and stores it in logs in the backend 4. you then have another process
    get_logs()
    retrieving dbt logs from the backend, 5. finally, you can do something with those retrieved logs (if I recall, you wanted to send it to some log aggregation service?) The
    dbt_run
    already contains all dbt logs automatically as long as you set those arguments to True:
    dbt = DbtShellTask(
        return_all=True,
        log_stdout=True,
        log_stderr=True)
    You can cut the steps 2-4, leaving only 1 and 5 if you move the functionality that needs to do something with your dbt logs to your "test-dbt-run-flow".
    dbt_run = dbt(
            command="dbt run", task_args={"name": "DBT Run"}, dbt_kwargs=db_credentials
        )
    task_sending_dbt_logs_somewhere(dbt_run)
    it's always better to explain your use case and the problem you try to solve rather than asking about specific features (such as retrieving logs from a flow run) because then we can provide much easier solutions as shown here
    Ken Nguyen

    Ken Nguyen

    5 months ago
    Thanks Anna, will definitely include more context next time! Your solution is definitely better, but I am still very curious to know why my current method was retrieving incomplete logs I’m worried that if the incomplete logs are due to some issues with setting upstream tasks, then implementing the suggested solution above would still have the same problem
    Additionally, you also said:
    You can cut the steps 2-4, leaving only 1 and 5 if you move the functionality that needs to do something with your dbt logs to your “test-dbt-run-flow”.
    Would you be able to pass on the dbt logs (list object) to another flow instead of moving the log handling process to test-dbt-run-flow? This is not an absolute need, but I’m asking because I have a few dbt flows that I might want to apply the same log handling process to
    Anna Geller

    Anna Geller

    5 months ago
    I am still very curious to know why my current method was retrieving incomplete logs
    perhaps this was just some latency issue? it can take some time before the logs are written (we write them in batches on the backend side) - it could be that your child flow run was trying to retrieve those logs before they were written
    I’m asking because I have a few dbt flows that I might want to apply the same log handling process to
    You can always call the same log handling callable within your task in any flow that needs it. You could write a custom module for it and import it in any module that needs it
    Ken Nguyen

    Ken Nguyen

    5 months ago
    That sounds good! Is the success/failure status of a flow raised before all the logs are finished writing?
    Anna Geller

    Anna Geller

    5 months ago
    The flow run status is not batched. It's set immediately, so it should be raised immediately if the flow run fails
    Edvard Kristiansen

    Edvard Kristiansen

    4 months ago
    Hey, I am looking to do something similar to the above. My goal is to send failed dbt tests as a notification in teams, including the failed tests from the logs. I am however struggling to access the dbt logs. @Ken Nguyen, any way you could share the get_logs() code that worked in the end?
    Ken Nguyen

    Ken Nguyen

    4 months ago
    Of course! I will include my code below. For a bit of context, the way I have it set up is my flow runs
    dbt test
    , and then it takes the logs from that and make it into a dataframe which I push into Snowflake (from there I feed it into a Looker dashboard).
    ##=========================================##
    ##=========== DBTSHELLTASK SETUP ==========##
    ##=========================================##
    from prefect.tasks.dbt.dbt import DbtShellTask
    
    secret = Secret("PREFECT_SNOWFLAKE_DETAILS").get()
    
    dbt = DbtShellTask(
        return_all=True,
        profile_name="drsquatch_dev",
        environment="dev",
        # profiles_dir=".",
        overwrite_profiles=True,
        log_stdout=True,
        helper_script="cd dbt/dbt",
        log_stderr=True,
        stream_output=True,
        dbt_kwargs={
            "type": "snowflake",
            "account": secret['SNOWFLAKE_ACCOUNT'],
            "user": secret['SNOWFLAKE_USERNAME'],
            "password": secret['SNOWFLAKE_PASSWORD'],
            "role": secret['SNOWFLAKE_ROLE'],
            "database": str(secret['SNOWFLAKE_DB']),
            "warehouse": secret['SNOWFLAKE_WAREHOUSE'],
            "schema": str(secret['SNOWFLAKE_SCHEMA']),
            "threads": 12,
            "client_session_keep_alive": False,
        },
        # state_handlers=[post_to_slack]
    )
    ##=========================================##
    ##=========================================##
    
    
    
    ##=========================================##
    ##========= LOGS PROCESSING TASKS =========##
    ##=========================================##
    import pandas as pd
    
    @task
    def get_relevant_logs(dbt_run):
        # Function to get dbt task logs + filter out relevant logs only
    
        <http://logger.info|logger.info>("Getting raw logs from dbt task:")
        <http://logger.info|logger.info>(dbt_run)
    
        <http://logger.info|logger.info>("Converting raw logs into a dataframe:")
        raw_logs_df = pd.DataFrame(dbt_run)
        raw_logs_df.columns = ['message']
        <http://logger.info|logger.info>(raw_logs_df)
    
        <http://logger.info|logger.info>("Select only relevant records from flow_logs (dbt tests and results):")
        test_log = raw_logs_df[raw_logs_df['message'].str.contains('Failure in test |Warning in test ')]
        test_log = test_log['message']
        # Find records that contains dbt tests
    
        test_index = test_log.index
        results_index = [i+1 for i in test_index]
        # Get index of results based on index of tests from dataframe
    
        results = raw_logs_df.iloc[results_index]
        results = results['message']
        results.index -= 1
        # Get results, adjust index to match with test_index for joining
    
        <http://logger.info|logger.info>("Combining test and results into one dataframe:")
        output = pd.merge(test_log, results, left_index=True, right_index=True)
        output.columns = ['test_log','results_log']
        <http://logger.info|logger.info>(output)
    
        return output
    
    
    @task
    def get_flow_run_id():
        # Function to get flow_run_name (but called flow_run_id) from context
    
        <http://logger.info|logger.info>("Getting name of run for labeling purposes:")
        output = prefect.context.get("flow_run_name")
        <http://logger.info|logger.info>(output)
    
        return output
    
    
    import datetime
    
    @task
    def get_time_of_flow_run():
        # Function to select the first time in the 'created' column and set that as time of run
    
        <http://logger.info|logger.info>("Getting time of run for labeling purposes:")
        output = datetime.datetime.now()
        <http://logger.info|logger.info>(output)
    
        return output
    
    
    @task
    def clean_logs(relevant_logs, run_id, time_of_run):
        # Function to extract test names and their corresponding results + test_type
    
        <http://logger.info|logger.info>("Cleaning logs + labeling:")
        test_cleaned = relevant_logs['test_log'].str.extract('Failure in test (\w+) |Warning in test (\w+) ').bfill(axis=1)[0]
        results_cleaned = relevant_logs['results_log'].str.extract('Got (\w+) result')
        test_type = relevant_logs['results_log'].str.extract('configured to (\w+) if')
    
        relevant_logs = pd.merge(relevant_logs, test_cleaned, left_index=True, right_index=True)
        relevant_logs = pd.merge(relevant_logs, results_cleaned, left_index=True, right_index=True)
        relevant_logs = pd.merge(relevant_logs, test_type, left_index=True, right_index=True)
    
        relevant_logs.columns = ['test_log','results_log', 'test_id', 'results', 'test_type']
    
        relevant_logs['run_id'] = run_id
        relevant_logs['time_of_run'] = time_of_run
    
        output = relevant_logs[['run_id', 'time_of_run', 'test_id', 'results', 'test_type']]
        <http://logger.info|logger.info>(output)
    
        return output
    
    
    from sqlalchemy import create_engine
    from snowflake.sqlalchemy import URL
    
    secret = Secret("PREFECT_SNOWFLAKE_DETAILS").get()
    
    @task
    def df_to_snowflake(df, table_name, exist_option):
        # Function to push dataframe to Snowflake table
    
        <http://logger.info|logger.info>("Pushing dataframe to Snowflake:")
        url = URL(
            account = str(secret['SNOWFLAKE_ACCOUNT']),
            user = str(secret['SNOWFLAKE_USERNAME']),
            password = str(secret['SNOWFLAKE_PASSWORD']),
            database = str(secret['SNOWFLAKE_DB']),
            schema = str(secret['SNOWFLAKE_SCHEMA']),
            warehouse = str(secret['SNOWFLAKE_WAREHOUSE']),
            role = str(secret['SNOWFLAKE_ROLE']),
        )
        engine = create_engine(url)
    
        connection = engine.connect()
        df.to_sql(table_name, con = connection, index = False, chunksize=16000, if_exists = exist_option)
        <http://logger.info|logger.info>("Successfully pushed to Snowflake")
        connection.close()
        engine.dispose()
    ##=========================================##
    ##=========================================##
    
    
    
    ##=========================================##
    ##================= FLOW ==================##
    ##=========================================##
    with Flow("dbt-test-all-flow", run_config=RUN_CONFIG, storage=STORAGE, state_handlers=[post_to_slack]) as flow:
        pull_repo = pull_dbt_repo()
    
        deps = dbt(
            command="dbt deps",
            task_args={"name": "DBT: Dependencies"},
            upstream_tasks=[pull_repo],
        )
    
        test_dbt_run = dbt(
            command="dbt test",
            task_args={"name": "DBT: Test All"},
            upstream_tasks=[deps],
        )
    
        relevant_logs = get_relevant_logs(
            test_dbt_run,
            task_args={"name": "Getting dbt Task Logs + Filtering Relevant Logs", "trigger": all_finished},
            upstream_tasks=[test_dbt_run],
        )
    
        flow_run_id = get_flow_run_id(
            task_args={"name": "Getting Name of Run"},
        )
    
        time_of_flow_run = get_time_of_flow_run(
            task_args={"name": "Getting Time of Run"},
        )
    
        final_output = clean_logs(
            relevant_logs, flow_run_id, time_of_flow_run,
            task_args={"name": "Cleaning + Labeling Logs"},
            upstream_tasks=[time_of_flow_run],
        )
    
        df_to_snowflake(
            final_output, "dbt_test_errors", "append",
            task_args={"name": "Pushing Dataframe to Snowflake"},
            upstream_tasks=[final_output],
        )
    
        flow.set_reference_tasks([test_dbt_run])
    ##=========================================##
    ##=========================================##
    Edvard Kristiansen

    Edvard Kristiansen

    4 months ago
    This is perfect, thank you so much @Ken Nguyen!