I currently have a flow that is set up like so: ``...
# prefect-community
k
I currently have a flow that is set up like so:
Copy code
with Flow("data-quality-tracking-model-run-duration-flow", run_config=RUN_CONFIG, storage=STORAGE) as flow:

  dbt_run_flow_run_id = create_flow_run(
      flow_name="test-dbt-run-flow",
      project_name="data_quality_tracking"
  )


  flow_run = wait_for_flow_run(
      dbt_run_flow_run_id, raise_final_state=True, stream_logs=True
  )


  flow_logs = get_logs(
      dbt_run_flow_run_id,
      task_args={"name": "Getting logs", "trigger": all_finished},
      upstream_tasks=[flow_run],)
get_logs
is a function that takes in a
flow_run_id
, create a
FlowRunView
, then get the logs of that
FlowRunView
. My issue is that despite having both upstream_task defined AND a FlowRunView.get_latest() for the
get_logs
task, I’m still getting logs that are incomplete. Do you have any suggestions for why my
get_logs
function is prematurely retrieving logs of the child flow/retrieving incomplete logs?
Not super helpful picture, but it’s meant to show that the final line of the logs retrieved is not the expected final line. When checking the child flow’s logs within the parent flow’s logs (because
stream_logs=True
). I also see incomplete logs
However when I go into the child flow logs itself I see it was complete
a
I think there is a much easier way to retrieve dbt logs from your run. Right now, you have: 1. one flow that triggers dbt and returns its output, 2. it then prints the dbt output, 3. Prefect takes this printed output and stores it in logs in the backend 4. you then have another process
get_logs()
retrieving dbt logs from the backend, 5. finally, you can do something with those retrieved logs (if I recall, you wanted to send it to some log aggregation service?) The
dbt_run
already contains all dbt logs automatically as long as you set those arguments to True:
Copy code
dbt = DbtShellTask(
    return_all=True,
    log_stdout=True,
    log_stderr=True)
You can cut the steps 2-4, leaving only 1 and 5 if you move the functionality that needs to do something with your dbt logs to your "test-dbt-run-flow".
Copy code
dbt_run = dbt(
        command="dbt run", task_args={"name": "DBT Run"}, dbt_kwargs=db_credentials
    )
task_sending_dbt_logs_somewhere(dbt_run)
it's always better to explain your use case and the problem you try to solve rather than asking about specific features (such as retrieving logs from a flow run) because then we can provide much easier solutions as shown here
k
Thanks Anna, will definitely include more context next time! Your solution is definitely better, but I am still very curious to know why my current method was retrieving incomplete logs I’m worried that if the incomplete logs are due to some issues with setting upstream tasks, then implementing the suggested solution above would still have the same problem
Additionally, you also said:
You can cut the steps 2-4, leaving only 1 and 5 if you move the functionality that needs to do something with your dbt logs to your “test-dbt-run-flow”.
Would you be able to pass on the dbt logs (list object) to another flow instead of moving the log handling process to test-dbt-run-flow? This is not an absolute need, but I’m asking because I have a few dbt flows that I might want to apply the same log handling process to
a
I am still very curious to know why my current method was retrieving incomplete logs
perhaps this was just some latency issue? it can take some time before the logs are written (we write them in batches on the backend side) - it could be that your child flow run was trying to retrieve those logs before they were written
I’m asking because I have a few dbt flows that I might want to apply the same log handling process to
You can always call the same log handling callable within your task in any flow that needs it. You could write a custom module for it and import it in any module that needs it
k
That sounds good! Is the success/failure status of a flow raised before all the logs are finished writing?
👍 1
a
The flow run status is not batched. It's set immediately, so it should be raised immediately if the flow run fails
👀 1
e
Hey, I am looking to do something similar to the above. My goal is to send failed dbt tests as a notification in teams, including the failed tests from the logs. I am however struggling to access the dbt logs. @Ken Nguyen, any way you could share the get_logs() code that worked in the end?
k
Of course! I will include my code below. For a bit of context, the way I have it set up is my flow runs
dbt test
, and then it takes the logs from that and make it into a dataframe which I push into Snowflake (from there I feed it into a Looker dashboard).
Copy code
##=========================================##
##=========== DBTSHELLTASK SETUP ==========##
##=========================================##
from prefect.tasks.dbt.dbt import DbtShellTask

secret = Secret("PREFECT_SNOWFLAKE_DETAILS").get()

dbt = DbtShellTask(
    return_all=True,
    profile_name="drsquatch_dev",
    environment="dev",
    # profiles_dir=".",
    overwrite_profiles=True,
    log_stdout=True,
    helper_script="cd dbt/dbt",
    log_stderr=True,
    stream_output=True,
    dbt_kwargs={
        "type": "snowflake",
        "account": secret['SNOWFLAKE_ACCOUNT'],
        "user": secret['SNOWFLAKE_USERNAME'],
        "password": secret['SNOWFLAKE_PASSWORD'],
        "role": secret['SNOWFLAKE_ROLE'],
        "database": str(secret['SNOWFLAKE_DB']),
        "warehouse": secret['SNOWFLAKE_WAREHOUSE'],
        "schema": str(secret['SNOWFLAKE_SCHEMA']),
        "threads": 12,
        "client_session_keep_alive": False,
    },
    # state_handlers=[post_to_slack]
)
##=========================================##
##=========================================##



##=========================================##
##========= LOGS PROCESSING TASKS =========##
##=========================================##
import pandas as pd

@task
def get_relevant_logs(dbt_run):
    # Function to get dbt task logs + filter out relevant logs only

    <http://logger.info|logger.info>("Getting raw logs from dbt task:")
    <http://logger.info|logger.info>(dbt_run)

    <http://logger.info|logger.info>("Converting raw logs into a dataframe:")
    raw_logs_df = pd.DataFrame(dbt_run)
    raw_logs_df.columns = ['message']
    <http://logger.info|logger.info>(raw_logs_df)

    <http://logger.info|logger.info>("Select only relevant records from flow_logs (dbt tests and results):")
    test_log = raw_logs_df[raw_logs_df['message'].str.contains('Failure in test |Warning in test ')]
    test_log = test_log['message']
    # Find records that contains dbt tests

    test_index = test_log.index
    results_index = [i+1 for i in test_index]
    # Get index of results based on index of tests from dataframe

    results = raw_logs_df.iloc[results_index]
    results = results['message']
    results.index -= 1
    # Get results, adjust index to match with test_index for joining

    <http://logger.info|logger.info>("Combining test and results into one dataframe:")
    output = pd.merge(test_log, results, left_index=True, right_index=True)
    output.columns = ['test_log','results_log']
    <http://logger.info|logger.info>(output)

    return output


@task
def get_flow_run_id():
    # Function to get flow_run_name (but called flow_run_id) from context

    <http://logger.info|logger.info>("Getting name of run for labeling purposes:")
    output = prefect.context.get("flow_run_name")
    <http://logger.info|logger.info>(output)

    return output


import datetime

@task
def get_time_of_flow_run():
    # Function to select the first time in the 'created' column and set that as time of run

    <http://logger.info|logger.info>("Getting time of run for labeling purposes:")
    output = datetime.datetime.now()
    <http://logger.info|logger.info>(output)

    return output


@task
def clean_logs(relevant_logs, run_id, time_of_run):
    # Function to extract test names and their corresponding results + test_type

    <http://logger.info|logger.info>("Cleaning logs + labeling:")
    test_cleaned = relevant_logs['test_log'].str.extract('Failure in test (\w+) |Warning in test (\w+) ').bfill(axis=1)[0]
    results_cleaned = relevant_logs['results_log'].str.extract('Got (\w+) result')
    test_type = relevant_logs['results_log'].str.extract('configured to (\w+) if')

    relevant_logs = pd.merge(relevant_logs, test_cleaned, left_index=True, right_index=True)
    relevant_logs = pd.merge(relevant_logs, results_cleaned, left_index=True, right_index=True)
    relevant_logs = pd.merge(relevant_logs, test_type, left_index=True, right_index=True)

    relevant_logs.columns = ['test_log','results_log', 'test_id', 'results', 'test_type']

    relevant_logs['run_id'] = run_id
    relevant_logs['time_of_run'] = time_of_run

    output = relevant_logs[['run_id', 'time_of_run', 'test_id', 'results', 'test_type']]
    <http://logger.info|logger.info>(output)

    return output


from sqlalchemy import create_engine
from snowflake.sqlalchemy import URL

secret = Secret("PREFECT_SNOWFLAKE_DETAILS").get()

@task
def df_to_snowflake(df, table_name, exist_option):
    # Function to push dataframe to Snowflake table

    <http://logger.info|logger.info>("Pushing dataframe to Snowflake:")
    url = URL(
        account = str(secret['SNOWFLAKE_ACCOUNT']),
        user = str(secret['SNOWFLAKE_USERNAME']),
        password = str(secret['SNOWFLAKE_PASSWORD']),
        database = str(secret['SNOWFLAKE_DB']),
        schema = str(secret['SNOWFLAKE_SCHEMA']),
        warehouse = str(secret['SNOWFLAKE_WAREHOUSE']),
        role = str(secret['SNOWFLAKE_ROLE']),
    )
    engine = create_engine(url)

    connection = engine.connect()
    df.to_sql(table_name, con = connection, index = False, chunksize=16000, if_exists = exist_option)
    <http://logger.info|logger.info>("Successfully pushed to Snowflake")
    connection.close()
    engine.dispose()
##=========================================##
##=========================================##



##=========================================##
##================= FLOW ==================##
##=========================================##
with Flow("dbt-test-all-flow", run_config=RUN_CONFIG, storage=STORAGE, state_handlers=[post_to_slack]) as flow:
    pull_repo = pull_dbt_repo()

    deps = dbt(
        command="dbt deps",
        task_args={"name": "DBT: Dependencies"},
        upstream_tasks=[pull_repo],
    )

    test_dbt_run = dbt(
        command="dbt test",
        task_args={"name": "DBT: Test All"},
        upstream_tasks=[deps],
    )

    relevant_logs = get_relevant_logs(
        test_dbt_run,
        task_args={"name": "Getting dbt Task Logs + Filtering Relevant Logs", "trigger": all_finished},
        upstream_tasks=[test_dbt_run],
    )

    flow_run_id = get_flow_run_id(
        task_args={"name": "Getting Name of Run"},
    )

    time_of_flow_run = get_time_of_flow_run(
        task_args={"name": "Getting Time of Run"},
    )

    final_output = clean_logs(
        relevant_logs, flow_run_id, time_of_flow_run,
        task_args={"name": "Cleaning + Labeling Logs"},
        upstream_tasks=[time_of_flow_run],
    )

    df_to_snowflake(
        final_output, "dbt_test_errors", "append",
        task_args={"name": "Pushing Dataframe to Snowflake"},
        upstream_tasks=[final_output],
    )

    flow.set_reference_tasks([test_dbt_run])
##=========================================##
##=========================================##
upvote 1
🙌 1
🙏 1
e
This is perfect, thank you so much @Ken Nguyen!
🎉 1