https://prefect.io logo
#prefect-community
Title
# prefect-community
m

Madison Schott

04/29/2022, 5:42 PM
Does anyone have Slack alerts set up with dbt tests on Prefect? Is it possible to send an alert with dbt source tests that are warning or failed?
a

Anna Geller

04/29/2022, 5:47 PM
You could do something like this:
Copy code
@task(trigger=any_failed)
def send_slack_alert_on_failure(output):
    SlackTask(message=output).run()
this post provides a full example
m

Madison Schott

04/29/2022, 6:01 PM
Ok this could potentially work- However, with the test command I get passing and warning tests- is there a pay to only send the dbt output messages that include "warn"?
Copy code
17:56:23  8 of 17 PASS freshness of google_sheets.misc_marketing_spend_winc_influencers... [PASS in 4.13s]
10:56:24
INFO
DbtShellTask
17:56:24  7 of 17 WARN freshness of google_sheets.misc_marketing_spend_winc............... [WARN in 4.39s]
EX of PASS vs WARN ^
a

Anna Geller

04/29/2022, 6:09 PM
You would somehow need to filter by the log message content. It would be possible to do with something like Grafana - you could send all your dbt logs to some log aggregation service and then configure alerts based on message content Alternative: This could be a bit hacky, but you could do something like this in your flow:
Copy code
@task(trigger=any_failed)
def send_slack_alert_on_failure(output):
    for line in output:
        if "WARN" in line:
            SlackTask(message=output).run()
m

Madison Schott

04/29/2022, 6:54 PM
getting this error when adding that code, what's the best way to debug this?
Copy code
Beginning health checks...
Traceback (most recent call last):
  File "/opt/prefect/healthcheck.py", line 152, in <module>
    flows = cloudpickle_deserialization_check(flow_file_paths)
  File "/opt/prefect/healthcheck.py", line 44, in cloudpickle_deserialization_check
    flows.append(cloudpickle.loads(flow_bytes))
TypeError: code() takes at most 15 arguments (16 given)
a

Anna Geller

04/29/2022, 7:28 PM
try adding checkpoint=False to the task decorator on this task, otherwise the task run result may not get serialized properly
m

Madison Schott

04/29/2022, 7:31 PM
hmm still getting the error
Copy code
@task(trigger=any_failed, checkpoint=False)
def send_slack_alert_on_test(output):
    SlackTask(message=output).run()
a

Anna Geller

04/29/2022, 7:31 PM
even though it actually looks like an error from Docker storage - you could disable the health checks on your Docker storage using
ignore_healthchecks=True
, but the health checks are for a reason, looks like something in your flow code cannot be serialized with cloudpickle. If you need help debugging this, can you share your flow code?
the full flow code - you can also share via DM
this is weird; your code looks fine! It could be some version mismatch between the base image you defined in the DockerFile and the python version used in your flow registration environment - can you cross-check if those versions are matching?
apart from that, those two tasks seem to be disconnected from the rest of your flow - just letting you know, it's likely you made that on purpose
m

Madison Schott

04/29/2022, 7:42 PM
I did get an error about the Python versions however the code works with that error when I don’t include that task so I figured it was something else
a

Anna Geller

04/29/2022, 7:43 PM
I see, perhaps worth matching the Python versions and trying again? doesn't hurt for sure
m

Madison Schott

04/29/2022, 7:43 PM
Yeah this is just in dev, in prod I will set a dependency between the syncs and dbt tests
👍 1
what's the best way to specify python version with prefect dockerfile?
a

Anna Geller

04/29/2022, 8:07 PM
you would set it in your base image
m

Madison Schott

04/29/2022, 8:12 PM
I don't currently specify any python version there
not having any luck with this
Copy code
FROM python:3.9-slim
RUN python -m pip install
this is my base image, isn't this created on Prefect's end?
Copy code
# specify a base image
FROM prefecthq/prefect:latest
Got it to work!
👍 1
Copy code
Unexpected error: AttributeError("'FunctionTask' object has no attribute 'retry_on'")
Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/prefect/engine/runner.py", line 48, in inner
    new_state = method(self, state, *args, **kwargs)
  File "/usr/local/lib/python3.9/site-packages/prefect/engine/task_runner.py", line 1000, in check_for_retry
    self.task.retry_on
AttributeError: 'FunctionTask' object has no attribute 'retry_on'
seeing this error when running in prod ^
Also getting a bad request for the url although it's the one generated for Prefect for my channel
a

Anna Geller

05/01/2022, 4:13 PM
retry_on
is a recently added argument to the Task class, which makes me believe there is still some version mismatch between your registration and your prod runtime environments. Looks like your registration/build environment uses a more recent version of Prefect where this new argument exists, while your runtime/production environment is on an older version of Prefect and doesn't yet have this argument
retry_on
m

Madison Schott

05/02/2022, 7:17 PM
how do I update these environments so that they match?
is there something in my dashboard I need to change? or is it with my Docker contain? my file?
a

Anna Geller

05/02/2022, 7:46 PM
I believe if you e.g. want the most recent Prefect version in both environments, you could do:
Copy code
pip install prefect --upgrade
in both
m

Madison Schott

05/02/2022, 7:47 PM
locally and where else?
a

Anna Geller

05/02/2022, 7:48 PM
your registration environment (probably your development machine or CI system) and the Dockerfile if you explicitly set the image version there. If you don't use Dockerfile, Prefect will try to match the Prefect version with your environment
m

Madison Schott

05/02/2022, 9:08 PM
For some reason I'm not receiving the messages with "WARN"- will printing out the line here print to the Prefect logs?
Copy code
@task(trigger=all_finished, checkpoint=False)
def send_slack_alert_on_test(output):
    for line in output:
        if "WARN" in line:
            print(line)
            SlackTask(message=line).run()
I did this but nothing is being printed:
Copy code
@task(trigger=all_finished, checkpoint=False, log_stdout=True)
def send_slack_alert_on_test(output):
    for line in output:
        if "WARN" in line:
            print(line)
            SlackTask(message=line).run()
a

Anna Geller

05/03/2022, 12:31 AM
will printing out the line here print to the Prefect logs?
A really good question! I think it should work, as long as you add
log_stdout=True
to your task decorator (which you did) 🤔 Alternatively, you could do the same using logger which is safer:
Copy code
logger = prefect.context.get("logger")
for line in output:
        if "WARN" in line:
            <http://logger.info|logger.info>(line)
m

Madison Schott

05/03/2022, 5:15 PM
hmmm yeah nothing is printing out
any ideas why?
a

Anna Geller

05/03/2022, 7:04 PM
honestly no, I don't. Can you try with a simple flow and a simple dbt DAG and try to tackle it more step by step? does this work without filtering for WARN messages only?
m

Madison Schott

05/03/2022, 7:51 PM
no it doesn't
so it looks like output is just that last message in a task run
so it doesn't actually give you each line
Copy code
def send_slack_alert_on_test(output):
    print(output)
so this would only print that
20:35:55  Done.
message
so I guess is there a way to send all the messages from a dbt run to this function rather than just that last message?
a

Anna Geller

05/03/2022, 8:51 PM
Yes there is! you would need to do that instead:
Copy code
logger = prefect.context.get("logger")
if "WARN" in output:
    <http://logger.info|logger.info>(output) # or put it into Slack task
basically not iterating over each line of output would give you the entire output at once
m

Madison Schott

05/03/2022, 8:56 PM
when I just print the entire output I only get that last message
I think I found the solution in the docs- I need to add
return_all=True
to my dbt task
a

Anna Geller

05/03/2022, 9:27 PM
that's true, I thought you have that already
have you seen my dbt blog post series? I'm no big dbt user but pretty much most of what I know about orchestrating dbt DAGs is there, including getting logs from the runs - if you are interested it's here
m

Madison Schott

05/04/2022, 10:27 PM
cool thanks!
👍 1
7 Views