https://prefect.io logo
j

Jovan Sakovic

01/29/2022, 11:30 AM
The Slack Notifier Trying to get the error message of a dbt run in Slack, and have set up the state_handler to only handle the
Failed
state. But, the DbtShellTask raises the FAIL signal with the message that first occurs
Command failed with exit code 2
and the notifier doesn’t get to the Error that I actually want - dbt’s error. Is there a way to push these other error messages to the notifier?
a

Anna Geller

01/29/2022, 11:48 AM
A couple of resources you can try: • this state handler adds the entire traceback to a Slack message (can be verbose though) https://gist.github.com/anna-geller/2014180ee5eaec9ea54f4d3f5b98ca93 • this post shows example dbt flow where output of dbt is always logged even if the job fails (via trigger all_finished) https://www.prefect.io/blog/flow-of-flows-orchestrating-elt-with-prefect-and-dbt/
Copy code
@task(trigger=all_finished)
def print_dbt_output(output):
    logger = prefect.context.get("logger")
    for line in output:
        <http://logger.info|logger.info>(line)
🙏 2
j

Jovan Sakovic

01/29/2022, 1:54 PM
Hey Anna, thanks a bunch! This pointed me in the right direction 🙌 The first option was very verbose, but still didn’t output the actual dbt error. The second option is getting there, just need to get this to Slack instead of the logs. So I’m trying to build a combination of the two: 👉 a notification task that is mapped to all dbt tasks that I’m running (deps, seed, run), triggered by
any_failed
and then collects their output to a message that will be sent to Slack. I guess this works fine if I use the SlackTask there, but now it got me wondering and trying to figure out how to use the state handler on this mapped task 🤔
The List task does not propagate the failed state of the upstream tasks.
I tried the following:
Copy code
slack_handler = slack_notifier(only_states=[Failed])

@task(trigger=any_failed, state_handlers=[slack_handler])
def trigger_slack(output):
    msg = "\n".join(output)
    raise FAIL(msg)

with Flow('run-dbt') as flow:
    dbt_deps = dbt(command="dbt deps", task_args={"name": "dbt dependencies"})
    dbt_seed = dbt(command="dbt seed", upstream_tasks=[dbt_deps], task_args={"name": "dbt seed"})
    dbt_run = dbt(command="dbt run", upstream_tasks=[dbt_seed], task_args={"name": "dbt run"})
    
    trigger_slack.map([dbt_deps, dbt_seed, dbt_run],
                        task_args={"name": "dbt Fail Output"})
I’m thinking of two ways: • I need to reduce the states of the List task to check if there’s failed states (is it possible to get the state of the task that is in the .map()’s array of tasks? 🤔 edit: I may be looking for this? 🕵️‍♂️) • or trigger the List task with a Failed state
a

Anna Geller

01/29/2022, 2:29 PM
If the trigger slack task has the trigger
any_failed
, then you only need to call it in your flow and specify that this task depends on the DBT tasks:
Copy code
@task(trigger=any_failed)
def send_slack_alert_on_failure(output):
    SlackTask(message=output).run()

# in Flow block:
    send_slack_alert_on_failure(dbt_run)
    send_slack_alert_on_failure(dbt_test)
So there is no need for mapping here imo and I think it’s cleaner to call it separately on each dbt task to explicitly show that on failure it sends an alert (and you can even name it to nicely visualize it in the DAG). Btw, why do you run dbt_seed, dbt_run and dbt_deps in parallel? Does it make sense to start dbt_run unless we are sure the seed and deps were successful? Here is a full gist you can use as a template: https://gist.github.com/1ee3a57076361fee4bb633ef86f7d989
j

Jovan Sakovic

01/29/2022, 2:55 PM
That’s fair enough, I’ll go with SlackTask, thank you! I thought I didn’t run them in parallel? 😅 They have the
upstream_tasks
set up 🤔
k

Kevin Kho

01/29/2022, 3:43 PM
+1 don’t use
slack_notifier
. It’s just not as customizable. Yes it looks sequential to me.
🙌 1
j

Jovan Sakovic

01/29/2022, 4:39 PM
Thank you, both! Settled with taking the
slack_notifier
apart - so now I’m manually sending a POST request with the similarly made payload (for now)👌P
a

Anna Geller

01/29/2022, 7:33 PM
Your DAG showed them in one line so it looked like they weren't sequential. All good👍
4 Views