Ken Nguyen
04/02/2022, 9:05 AMwith Flow("data-quality-tracking-model-run-duration-flow", run_config=RUN_CONFIG, storage=STORAGE) as flow:
dbt_run_flow_run_id = create_flow_run(
flow_name="test-dbt-run-flow",
project_name="data_quality_tracking"
)
flow_run = wait_for_flow_run(
dbt_run_flow_run_id, raise_final_state=True, stream_logs=True
)
flow_logs = get_logs(
dbt_run_flow_run_id,
task_args={"name": "Getting logs", "trigger": all_finished},
upstream_tasks=[flow_run],)
get_logs
is a function that takes in a flow_run_id
, create a FlowRunView
, then get the logs of that FlowRunView
. My issue is that despite having both upstream_task defined AND a FlowRunView.get_latest() for the get_logs
task, I’m still getting logs that are incomplete.
Do you have any suggestions for why my get_logs
function is prematurely retrieving logs of the child flow/retrieving incomplete logs?stream_logs=True
). I also see incomplete logsAnna Geller
get_logs()
retrieving dbt logs from the backend,
5. finally, you can do something with those retrieved logs (if I recall, you wanted to send it to some log aggregation service?)
The dbt_run
already contains all dbt logs automatically as long as you set those arguments to True:
dbt = DbtShellTask(
return_all=True,
log_stdout=True,
log_stderr=True)
You can cut the steps 2-4, leaving only 1 and 5 if you move the functionality that needs to do something with your dbt logs to your "test-dbt-run-flow".
dbt_run = dbt(
command="dbt run", task_args={"name": "DBT Run"}, dbt_kwargs=db_credentials
)
task_sending_dbt_logs_somewhere(dbt_run)
Ken Nguyen
04/02/2022, 5:04 PMYou can cut the steps 2-4, leaving only 1 and 5 if you move the functionality that needs to do something with your dbt logs to your “test-dbt-run-flow”.Would you be able to pass on the dbt logs (list object) to another flow instead of moving the log handling process to test-dbt-run-flow? This is not an absolute need, but I’m asking because I have a few dbt flows that I might want to apply the same log handling process to
Anna Geller
I am still very curious to know why my current method was retrieving incomplete logsperhaps this was just some latency issue? it can take some time before the logs are written (we write them in batches on the backend side) - it could be that your child flow run was trying to retrieve those logs before they were written
I’m asking because I have a few dbt flows that I might want to apply the same log handling process toYou can always call the same log handling callable within your task in any flow that needs it. You could write a custom module for it and import it in any module that needs it
Ken Nguyen
04/02/2022, 8:10 PMAnna Geller
Edvard Kristiansen
05/09/2022, 3:56 PMKen Nguyen
05/09/2022, 4:43 PMdbt test
, and then it takes the logs from that and make it into a dataframe which I push into Snowflake (from there I feed it into a Looker dashboard).
##=========================================##
##=========== DBTSHELLTASK SETUP ==========##
##=========================================##
from prefect.tasks.dbt.dbt import DbtShellTask
secret = Secret("PREFECT_SNOWFLAKE_DETAILS").get()
dbt = DbtShellTask(
return_all=True,
profile_name="drsquatch_dev",
environment="dev",
# profiles_dir=".",
overwrite_profiles=True,
log_stdout=True,
helper_script="cd dbt/dbt",
log_stderr=True,
stream_output=True,
dbt_kwargs={
"type": "snowflake",
"account": secret['SNOWFLAKE_ACCOUNT'],
"user": secret['SNOWFLAKE_USERNAME'],
"password": secret['SNOWFLAKE_PASSWORD'],
"role": secret['SNOWFLAKE_ROLE'],
"database": str(secret['SNOWFLAKE_DB']),
"warehouse": secret['SNOWFLAKE_WAREHOUSE'],
"schema": str(secret['SNOWFLAKE_SCHEMA']),
"threads": 12,
"client_session_keep_alive": False,
},
# state_handlers=[post_to_slack]
)
##=========================================##
##=========================================##
##=========================================##
##========= LOGS PROCESSING TASKS =========##
##=========================================##
import pandas as pd
@task
def get_relevant_logs(dbt_run):
# Function to get dbt task logs + filter out relevant logs only
<http://logger.info|logger.info>("Getting raw logs from dbt task:")
<http://logger.info|logger.info>(dbt_run)
<http://logger.info|logger.info>("Converting raw logs into a dataframe:")
raw_logs_df = pd.DataFrame(dbt_run)
raw_logs_df.columns = ['message']
<http://logger.info|logger.info>(raw_logs_df)
<http://logger.info|logger.info>("Select only relevant records from flow_logs (dbt tests and results):")
test_log = raw_logs_df[raw_logs_df['message'].str.contains('Failure in test |Warning in test ')]
test_log = test_log['message']
# Find records that contains dbt tests
test_index = test_log.index
results_index = [i+1 for i in test_index]
# Get index of results based on index of tests from dataframe
results = raw_logs_df.iloc[results_index]
results = results['message']
results.index -= 1
# Get results, adjust index to match with test_index for joining
<http://logger.info|logger.info>("Combining test and results into one dataframe:")
output = pd.merge(test_log, results, left_index=True, right_index=True)
output.columns = ['test_log','results_log']
<http://logger.info|logger.info>(output)
return output
@task
def get_flow_run_id():
# Function to get flow_run_name (but called flow_run_id) from context
<http://logger.info|logger.info>("Getting name of run for labeling purposes:")
output = prefect.context.get("flow_run_name")
<http://logger.info|logger.info>(output)
return output
import datetime
@task
def get_time_of_flow_run():
# Function to select the first time in the 'created' column and set that as time of run
<http://logger.info|logger.info>("Getting time of run for labeling purposes:")
output = datetime.datetime.now()
<http://logger.info|logger.info>(output)
return output
@task
def clean_logs(relevant_logs, run_id, time_of_run):
# Function to extract test names and their corresponding results + test_type
<http://logger.info|logger.info>("Cleaning logs + labeling:")
test_cleaned = relevant_logs['test_log'].str.extract('Failure in test (\w+) |Warning in test (\w+) ').bfill(axis=1)[0]
results_cleaned = relevant_logs['results_log'].str.extract('Got (\w+) result')
test_type = relevant_logs['results_log'].str.extract('configured to (\w+) if')
relevant_logs = pd.merge(relevant_logs, test_cleaned, left_index=True, right_index=True)
relevant_logs = pd.merge(relevant_logs, results_cleaned, left_index=True, right_index=True)
relevant_logs = pd.merge(relevant_logs, test_type, left_index=True, right_index=True)
relevant_logs.columns = ['test_log','results_log', 'test_id', 'results', 'test_type']
relevant_logs['run_id'] = run_id
relevant_logs['time_of_run'] = time_of_run
output = relevant_logs[['run_id', 'time_of_run', 'test_id', 'results', 'test_type']]
<http://logger.info|logger.info>(output)
return output
from sqlalchemy import create_engine
from snowflake.sqlalchemy import URL
secret = Secret("PREFECT_SNOWFLAKE_DETAILS").get()
@task
def df_to_snowflake(df, table_name, exist_option):
# Function to push dataframe to Snowflake table
<http://logger.info|logger.info>("Pushing dataframe to Snowflake:")
url = URL(
account = str(secret['SNOWFLAKE_ACCOUNT']),
user = str(secret['SNOWFLAKE_USERNAME']),
password = str(secret['SNOWFLAKE_PASSWORD']),
database = str(secret['SNOWFLAKE_DB']),
schema = str(secret['SNOWFLAKE_SCHEMA']),
warehouse = str(secret['SNOWFLAKE_WAREHOUSE']),
role = str(secret['SNOWFLAKE_ROLE']),
)
engine = create_engine(url)
connection = engine.connect()
df.to_sql(table_name, con = connection, index = False, chunksize=16000, if_exists = exist_option)
<http://logger.info|logger.info>("Successfully pushed to Snowflake")
connection.close()
engine.dispose()
##=========================================##
##=========================================##
##=========================================##
##================= FLOW ==================##
##=========================================##
with Flow("dbt-test-all-flow", run_config=RUN_CONFIG, storage=STORAGE, state_handlers=[post_to_slack]) as flow:
pull_repo = pull_dbt_repo()
deps = dbt(
command="dbt deps",
task_args={"name": "DBT: Dependencies"},
upstream_tasks=[pull_repo],
)
test_dbt_run = dbt(
command="dbt test",
task_args={"name": "DBT: Test All"},
upstream_tasks=[deps],
)
relevant_logs = get_relevant_logs(
test_dbt_run,
task_args={"name": "Getting dbt Task Logs + Filtering Relevant Logs", "trigger": all_finished},
upstream_tasks=[test_dbt_run],
)
flow_run_id = get_flow_run_id(
task_args={"name": "Getting Name of Run"},
)
time_of_flow_run = get_time_of_flow_run(
task_args={"name": "Getting Time of Run"},
)
final_output = clean_logs(
relevant_logs, flow_run_id, time_of_flow_run,
task_args={"name": "Cleaning + Labeling Logs"},
upstream_tasks=[time_of_flow_run],
)
df_to_snowflake(
final_output, "dbt_test_errors", "append",
task_args={"name": "Pushing Dataframe to Snowflake"},
upstream_tasks=[final_output],
)
flow.set_reference_tasks([test_dbt_run])
##=========================================##
##=========================================##
Edvard Kristiansen
05/10/2022, 7:26 AM