Does anyone have Slack alerts set up with dbt test...
# ask-community
m
Does anyone have Slack alerts set up with dbt tests on Prefect? Is it possible to send an alert with dbt source tests that are warning or failed?
a
You could do something like this:
Copy code
@task(trigger=any_failed)
def send_slack_alert_on_failure(output):
    SlackTask(message=output).run()
this post provides a full example
m
Ok this could potentially work- However, with the test command I get passing and warning tests- is there a pay to only send the dbt output messages that include "warn"?
Copy code
17:56:23  8 of 17 PASS freshness of google_sheets.misc_marketing_spend_winc_influencers... [PASS in 4.13s]
10:56:24
INFO
DbtShellTask
17:56:24  7 of 17 WARN freshness of google_sheets.misc_marketing_spend_winc............... [WARN in 4.39s]
EX of PASS vs WARN ^
a
You would somehow need to filter by the log message content. It would be possible to do with something like Grafana - you could send all your dbt logs to some log aggregation service and then configure alerts based on message content Alternative: This could be a bit hacky, but you could do something like this in your flow:
Copy code
@task(trigger=any_failed)
def send_slack_alert_on_failure(output):
    for line in output:
        if "WARN" in line:
            SlackTask(message=output).run()
m
getting this error when adding that code, what's the best way to debug this?
Copy code
Beginning health checks...
Traceback (most recent call last):
  File "/opt/prefect/healthcheck.py", line 152, in <module>
    flows = cloudpickle_deserialization_check(flow_file_paths)
  File "/opt/prefect/healthcheck.py", line 44, in cloudpickle_deserialization_check
    flows.append(cloudpickle.loads(flow_bytes))
TypeError: code() takes at most 15 arguments (16 given)
a
try adding checkpoint=False to the task decorator on this task, otherwise the task run result may not get serialized properly
m
hmm still getting the error
Copy code
@task(trigger=any_failed, checkpoint=False)
def send_slack_alert_on_test(output):
    SlackTask(message=output).run()
a
even though it actually looks like an error from Docker storage - you could disable the health checks on your Docker storage using
ignore_healthchecks=True
, but the health checks are for a reason, looks like something in your flow code cannot be serialized with cloudpickle. If you need help debugging this, can you share your flow code?
the full flow code - you can also share via DM
this is weird; your code looks fine! It could be some version mismatch between the base image you defined in the DockerFile and the python version used in your flow registration environment - can you cross-check if those versions are matching?
apart from that, those two tasks seem to be disconnected from the rest of your flow - just letting you know, it's likely you made that on purpose
m
I did get an error about the Python versions however the code works with that error when I don’t include that task so I figured it was something else
a
I see, perhaps worth matching the Python versions and trying again? doesn't hurt for sure
m
Yeah this is just in dev, in prod I will set a dependency between the syncs and dbt tests
👍 1
what's the best way to specify python version with prefect dockerfile?
a
you would set it in your base image
m
I don't currently specify any python version there
not having any luck with this
Copy code
FROM python:3.9-slim
RUN python -m pip install
this is my base image, isn't this created on Prefect's end?
Copy code
# specify a base image
FROM prefecthq/prefect:latest
Got it to work!
👍 1
Copy code
Unexpected error: AttributeError("'FunctionTask' object has no attribute 'retry_on'")
Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/prefect/engine/runner.py", line 48, in inner
    new_state = method(self, state, *args, **kwargs)
  File "/usr/local/lib/python3.9/site-packages/prefect/engine/task_runner.py", line 1000, in check_for_retry
    self.task.retry_on
AttributeError: 'FunctionTask' object has no attribute 'retry_on'
seeing this error when running in prod ^
Also getting a bad request for the url although it's the one generated for Prefect for my channel
a
retry_on
is a recently added argument to the Task class, which makes me believe there is still some version mismatch between your registration and your prod runtime environments. Looks like your registration/build environment uses a more recent version of Prefect where this new argument exists, while your runtime/production environment is on an older version of Prefect and doesn't yet have this argument
retry_on
m
how do I update these environments so that they match?
is there something in my dashboard I need to change? or is it with my Docker contain? my file?
a
I believe if you e.g. want the most recent Prefect version in both environments, you could do:
Copy code
pip install prefect --upgrade
in both
m
locally and where else?
a
your registration environment (probably your development machine or CI system) and the Dockerfile if you explicitly set the image version there. If you don't use Dockerfile, Prefect will try to match the Prefect version with your environment
m
For some reason I'm not receiving the messages with "WARN"- will printing out the line here print to the Prefect logs?
Copy code
@task(trigger=all_finished, checkpoint=False)
def send_slack_alert_on_test(output):
    for line in output:
        if "WARN" in line:
            print(line)
            SlackTask(message=line).run()
I did this but nothing is being printed:
Copy code
@task(trigger=all_finished, checkpoint=False, log_stdout=True)
def send_slack_alert_on_test(output):
    for line in output:
        if "WARN" in line:
            print(line)
            SlackTask(message=line).run()
a
will printing out the line here print to the Prefect logs?
A really good question! I think it should work, as long as you add
log_stdout=True
to your task decorator (which you did) 🤔 Alternatively, you could do the same using logger which is safer:
Copy code
logger = prefect.context.get("logger")
for line in output:
        if "WARN" in line:
            <http://logger.info|logger.info>(line)
m
hmmm yeah nothing is printing out
any ideas why?
a
honestly no, I don't. Can you try with a simple flow and a simple dbt DAG and try to tackle it more step by step? does this work without filtering for WARN messages only?
m
no it doesn't
so it looks like output is just that last message in a task run
so it doesn't actually give you each line
Copy code
def send_slack_alert_on_test(output):
    print(output)
so this would only print that
20:35:55  Done.
message
so I guess is there a way to send all the messages from a dbt run to this function rather than just that last message?
a
Yes there is! you would need to do that instead:
Copy code
logger = prefect.context.get("logger")
if "WARN" in output:
    <http://logger.info|logger.info>(output) # or put it into Slack task
basically not iterating over each line of output would give you the entire output at once
m
when I just print the entire output I only get that last message
I think I found the solution in the docs- I need to add
return_all=True
to my dbt task
a
that's true, I thought you have that already
have you seen my dbt blog post series? I'm no big dbt user but pretty much most of what I know about orchestrating dbt DAGs is there, including getting logs from the runs - if you are interested it's here
m
cool thanks!
👍 1