Ken Nguyen
04/02/2022, 9:05 AMwith Flow("data-quality-tracking-model-run-duration-flow", run_config=RUN_CONFIG, storage=STORAGE) as flow:
  dbt_run_flow_run_id = create_flow_run(
      flow_name="test-dbt-run-flow",
      project_name="data_quality_tracking"
  )
  flow_run = wait_for_flow_run(
      dbt_run_flow_run_id, raise_final_state=True, stream_logs=True
  )
  flow_logs = get_logs(
      dbt_run_flow_run_id,
      task_args={"name": "Getting logs", "trigger": all_finished},
      upstream_tasks=[flow_run],)get_logsflow_run_idFlowRunViewFlowRunViewget_logsget_logsKen Nguyen
04/02/2022, 9:09 AMstream_logs=TrueKen Nguyen
04/02/2022, 9:10 AMAnna Geller
get_logs()dbt_rundbt = DbtShellTask(
    return_all=True,
    log_stdout=True,
    log_stderr=True)dbt_run = dbt(
        command="dbt run", task_args={"name": "DBT Run"}, dbt_kwargs=db_credentials
    )
task_sending_dbt_logs_somewhere(dbt_run)Anna Geller
Ken Nguyen
04/02/2022, 5:04 PMKen Nguyen
04/02/2022, 6:09 PMYou can cut the steps 2-4, leaving only 1 and 5 if you move the functionality that needs to do something with your dbt logs to your “test-dbt-run-flow”.Would you be able to pass on the dbt logs (list object) to another flow instead of moving the log handling process to test-dbt-run-flow? This is not an absolute need, but I’m asking because I have a few dbt flows that I might want to apply the same log handling process to
Anna Geller
I am still very curious to know why my current method was retrieving incomplete logsperhaps this was just some latency issue? it can take some time before the logs are written (we write them in batches on the backend side) - it could be that your child flow run was trying to retrieve those logs before they were written
I’m asking because I have a few dbt flows that I might want to apply the same log handling process toYou can always call the same log handling callable within your task in any flow that needs it. You could write a custom module for it and import it in any module that needs it
Ken Nguyen
04/02/2022, 8:10 PMAnna Geller
Edvard Kristiansen
05/09/2022, 3:56 PMKen Nguyen
05/09/2022, 4:43 PMdbt test##=========================================##
##=========== DBTSHELLTASK SETUP ==========##
##=========================================##
from prefect.tasks.dbt.dbt import DbtShellTask
secret = Secret("PREFECT_SNOWFLAKE_DETAILS").get()
dbt = DbtShellTask(
    return_all=True,
    profile_name="drsquatch_dev",
    environment="dev",
    # profiles_dir=".",
    overwrite_profiles=True,
    log_stdout=True,
    helper_script="cd dbt/dbt",
    log_stderr=True,
    stream_output=True,
    dbt_kwargs={
        "type": "snowflake",
        "account": secret['SNOWFLAKE_ACCOUNT'],
        "user": secret['SNOWFLAKE_USERNAME'],
        "password": secret['SNOWFLAKE_PASSWORD'],
        "role": secret['SNOWFLAKE_ROLE'],
        "database": str(secret['SNOWFLAKE_DB']),
        "warehouse": secret['SNOWFLAKE_WAREHOUSE'],
        "schema": str(secret['SNOWFLAKE_SCHEMA']),
        "threads": 12,
        "client_session_keep_alive": False,
    },
    # state_handlers=[post_to_slack]
)
##=========================================##
##=========================================##
##=========================================##
##========= LOGS PROCESSING TASKS =========##
##=========================================##
import pandas as pd
@task
def get_relevant_logs(dbt_run):
    # Function to get dbt task logs + filter out relevant logs only
    <http://logger.info|logger.info>("Getting raw logs from dbt task:")
    <http://logger.info|logger.info>(dbt_run)
    <http://logger.info|logger.info>("Converting raw logs into a dataframe:")
    raw_logs_df = pd.DataFrame(dbt_run)
    raw_logs_df.columns = ['message']
    <http://logger.info|logger.info>(raw_logs_df)
    <http://logger.info|logger.info>("Select only relevant records from flow_logs (dbt tests and results):")
    test_log = raw_logs_df[raw_logs_df['message'].str.contains('Failure in test |Warning in test ')]
    test_log = test_log['message']
    # Find records that contains dbt tests
    test_index = test_log.index
    results_index = [i+1 for i in test_index]
    # Get index of results based on index of tests from dataframe
    results = raw_logs_df.iloc[results_index]
    results = results['message']
    results.index -= 1
    # Get results, adjust index to match with test_index for joining
    <http://logger.info|logger.info>("Combining test and results into one dataframe:")
    output = pd.merge(test_log, results, left_index=True, right_index=True)
    output.columns = ['test_log','results_log']
    <http://logger.info|logger.info>(output)
    return output
@task
def get_flow_run_id():
    # Function to get flow_run_name (but called flow_run_id) from context
    <http://logger.info|logger.info>("Getting name of run for labeling purposes:")
    output = prefect.context.get("flow_run_name")
    <http://logger.info|logger.info>(output)
    return output
import datetime
@task
def get_time_of_flow_run():
    # Function to select the first time in the 'created' column and set that as time of run
    <http://logger.info|logger.info>("Getting time of run for labeling purposes:")
    output = datetime.datetime.now()
    <http://logger.info|logger.info>(output)
    return output
@task
def clean_logs(relevant_logs, run_id, time_of_run):
    # Function to extract test names and their corresponding results + test_type
    <http://logger.info|logger.info>("Cleaning logs + labeling:")
    test_cleaned = relevant_logs['test_log'].str.extract('Failure in test (\w+) |Warning in test (\w+) ').bfill(axis=1)[0]
    results_cleaned = relevant_logs['results_log'].str.extract('Got (\w+) result')
    test_type = relevant_logs['results_log'].str.extract('configured to (\w+) if')
    relevant_logs = pd.merge(relevant_logs, test_cleaned, left_index=True, right_index=True)
    relevant_logs = pd.merge(relevant_logs, results_cleaned, left_index=True, right_index=True)
    relevant_logs = pd.merge(relevant_logs, test_type, left_index=True, right_index=True)
    relevant_logs.columns = ['test_log','results_log', 'test_id', 'results', 'test_type']
    relevant_logs['run_id'] = run_id
    relevant_logs['time_of_run'] = time_of_run
    output = relevant_logs[['run_id', 'time_of_run', 'test_id', 'results', 'test_type']]
    <http://logger.info|logger.info>(output)
    return output
from sqlalchemy import create_engine
from snowflake.sqlalchemy import URL
secret = Secret("PREFECT_SNOWFLAKE_DETAILS").get()
@task
def df_to_snowflake(df, table_name, exist_option):
    # Function to push dataframe to Snowflake table
    <http://logger.info|logger.info>("Pushing dataframe to Snowflake:")
    url = URL(
        account = str(secret['SNOWFLAKE_ACCOUNT']),
        user = str(secret['SNOWFLAKE_USERNAME']),
        password = str(secret['SNOWFLAKE_PASSWORD']),
        database = str(secret['SNOWFLAKE_DB']),
        schema = str(secret['SNOWFLAKE_SCHEMA']),
        warehouse = str(secret['SNOWFLAKE_WAREHOUSE']),
        role = str(secret['SNOWFLAKE_ROLE']),
    )
    engine = create_engine(url)
    connection = engine.connect()
    df.to_sql(table_name, con = connection, index = False, chunksize=16000, if_exists = exist_option)
    <http://logger.info|logger.info>("Successfully pushed to Snowflake")
    connection.close()
    engine.dispose()
##=========================================##
##=========================================##
##=========================================##
##================= FLOW ==================##
##=========================================##
with Flow("dbt-test-all-flow", run_config=RUN_CONFIG, storage=STORAGE, state_handlers=[post_to_slack]) as flow:
    pull_repo = pull_dbt_repo()
    deps = dbt(
        command="dbt deps",
        task_args={"name": "DBT: Dependencies"},
        upstream_tasks=[pull_repo],
    )
    test_dbt_run = dbt(
        command="dbt test",
        task_args={"name": "DBT: Test All"},
        upstream_tasks=[deps],
    )
    relevant_logs = get_relevant_logs(
        test_dbt_run,
        task_args={"name": "Getting dbt Task Logs + Filtering Relevant Logs", "trigger": all_finished},
        upstream_tasks=[test_dbt_run],
    )
    flow_run_id = get_flow_run_id(
        task_args={"name": "Getting Name of Run"},
    )
    time_of_flow_run = get_time_of_flow_run(
        task_args={"name": "Getting Time of Run"},
    )
    final_output = clean_logs(
        relevant_logs, flow_run_id, time_of_flow_run,
        task_args={"name": "Cleaning + Labeling Logs"},
        upstream_tasks=[time_of_flow_run],
    )
    df_to_snowflake(
        final_output, "dbt_test_errors", "append",
        task_args={"name": "Pushing Dataframe to Snowflake"},
        upstream_tasks=[final_output],
    )
    flow.set_reference_tasks([test_dbt_run])
##=========================================##
##=========================================##Edvard Kristiansen
05/10/2022, 7:26 AMBring your towel and join one of the fastest growing data communities. Welcome to our second-generation open source orchestration platform, a completely rethought approach to dataflow automation.
Powered by