Madison Schott

    Madison Schott

    4 months ago
    Does anyone have Slack alerts set up with dbt tests on Prefect? Is it possible to send an alert with dbt source tests that are warning or failed?
    Anna Geller

    Anna Geller

    4 months ago
    You could do something like this:
    @task(trigger=any_failed)
    def send_slack_alert_on_failure(output):
        SlackTask(message=output).run()
    this post provides a full example
    Madison Schott

    Madison Schott

    4 months ago
    Ok this could potentially work- However, with the test command I get passing and warning tests- is there a pay to only send the dbt output messages that include "warn"?
    17:56:23  8 of 17 PASS freshness of google_sheets.misc_marketing_spend_winc_influencers... [PASS in 4.13s]
    10:56:24
    INFO
    DbtShellTask
    17:56:24  7 of 17 WARN freshness of google_sheets.misc_marketing_spend_winc............... [WARN in 4.39s]
    EX of PASS vs WARN ^
    Anna Geller

    Anna Geller

    4 months ago
    You would somehow need to filter by the log message content. It would be possible to do with something like Grafana - you could send all your dbt logs to some log aggregation service and then configure alerts based on message content Alternative: This could be a bit hacky, but you could do something like this in your flow:
    @task(trigger=any_failed)
    def send_slack_alert_on_failure(output):
        for line in output:
            if "WARN" in line:
                SlackTask(message=output).run()
    Madison Schott

    Madison Schott

    4 months ago
    getting this error when adding that code, what's the best way to debug this?
    Beginning health checks...
    Traceback (most recent call last):
      File "/opt/prefect/healthcheck.py", line 152, in <module>
        flows = cloudpickle_deserialization_check(flow_file_paths)
      File "/opt/prefect/healthcheck.py", line 44, in cloudpickle_deserialization_check
        flows.append(cloudpickle.loads(flow_bytes))
    TypeError: code() takes at most 15 arguments (16 given)
    Anna Geller

    Anna Geller

    4 months ago
    try adding checkpoint=False to the task decorator on this task, otherwise the task run result may not get serialized properly
    Madison Schott

    Madison Schott

    4 months ago
    hmm still getting the error
    @task(trigger=any_failed, checkpoint=False)
    def send_slack_alert_on_test(output):
        SlackTask(message=output).run()
    Anna Geller

    Anna Geller

    4 months ago
    even though it actually looks like an error from Docker storage - you could disable the health checks on your Docker storage using
    ignore_healthchecks=True
    , but the health checks are for a reason, looks like something in your flow code cannot be serialized with cloudpickle. If you need help debugging this, can you share your flow code?
    the full flow code - you can also share via DM
    this is weird; your code looks fine! It could be some version mismatch between the base image you defined in the DockerFile and the python version used in your flow registration environment - can you cross-check if those versions are matching?
    apart from that, those two tasks seem to be disconnected from the rest of your flow - just letting you know, it's likely you made that on purpose
    Madison Schott

    Madison Schott

    4 months ago
    I did get an error about the Python versions however the code works with that error when I don’t include that task so I figured it was something else
    Anna Geller

    Anna Geller

    4 months ago
    I see, perhaps worth matching the Python versions and trying again? doesn't hurt for sure
    Madison Schott

    Madison Schott

    4 months ago
    Yeah this is just in dev, in prod I will set a dependency between the syncs and dbt tests
    what's the best way to specify python version with prefect dockerfile?
    Anna Geller

    Anna Geller

    4 months ago
    you would set it in your base image
    Madison Schott

    Madison Schott

    4 months ago
    I don't currently specify any python version there
    not having any luck with this
    FROM python:3.9-slim
    RUN python -m pip install
    this is my base image, isn't this created on Prefect's end?
    # specify a base image
    FROM prefecthq/prefect:latest
    Got it to work!
    Unexpected error: AttributeError("'FunctionTask' object has no attribute 'retry_on'")
    Traceback (most recent call last):
      File "/usr/local/lib/python3.9/site-packages/prefect/engine/runner.py", line 48, in inner
        new_state = method(self, state, *args, **kwargs)
      File "/usr/local/lib/python3.9/site-packages/prefect/engine/task_runner.py", line 1000, in check_for_retry
        self.task.retry_on
    AttributeError: 'FunctionTask' object has no attribute 'retry_on'
    seeing this error when running in prod ^
    Also getting a bad request for the url although it's the one generated for Prefect for my channel
    Anna Geller

    Anna Geller

    4 months ago
    retry_on
    is a recently added argument to the Task class, which makes me believe there is still some version mismatch between your registration and your prod runtime environments. Looks like your registration/build environment uses a more recent version of Prefect where this new argument exists, while your runtime/production environment is on an older version of Prefect and doesn't yet have this argument
    retry_on
    Madison Schott

    Madison Schott

    4 months ago
    how do I update these environments so that they match?
    is there something in my dashboard I need to change? or is it with my Docker contain? my file?
    Anna Geller

    Anna Geller

    4 months ago
    I believe if you e.g. want the most recent Prefect version in both environments, you could do:
    pip install prefect --upgrade
    in both
    Madison Schott

    Madison Schott

    4 months ago
    locally and where else?
    Anna Geller

    Anna Geller

    4 months ago
    your registration environment (probably your development machine or CI system) and the Dockerfile if you explicitly set the image version there. If you don't use Dockerfile, Prefect will try to match the Prefect version with your environment
    Madison Schott

    Madison Schott

    4 months ago
    For some reason I'm not receiving the messages with "WARN"- will printing out the line here print to the Prefect logs?
    @task(trigger=all_finished, checkpoint=False)
    def send_slack_alert_on_test(output):
        for line in output:
            if "WARN" in line:
                print(line)
                SlackTask(message=line).run()
    I did this but nothing is being printed:
    @task(trigger=all_finished, checkpoint=False, log_stdout=True)
    def send_slack_alert_on_test(output):
        for line in output:
            if "WARN" in line:
                print(line)
                SlackTask(message=line).run()
    Anna Geller

    Anna Geller

    4 months ago
    will printing out the line here print to the Prefect logs?
    A really good question! I think it should work, as long as you add
    log_stdout=True
    to your task decorator (which you did) πŸ€” Alternatively, you could do the same using logger which is safer:
    logger = prefect.context.get("logger")
    for line in output:
            if "WARN" in line:
                <http://logger.info|logger.info>(line)
    Madison Schott

    Madison Schott

    4 months ago
    hmmm yeah nothing is printing out
    any ideas why?
    Anna Geller

    Anna Geller

    4 months ago
    honestly no, I don't. Can you try with a simple flow and a simple dbt DAG and try to tackle it more step by step? does this work without filtering for WARN messages only?
    Madison Schott

    Madison Schott

    4 months ago
    no it doesn't
    so it looks like output is just that last message in a task run
    so it doesn't actually give you each line
    def send_slack_alert_on_test(output):
        print(output)
    so this would only print that
    20:35:55  Done.
    message
    so I guess is there a way to send all the messages from a dbt run to this function rather than just that last message?
    Anna Geller

    Anna Geller

    4 months ago
    Yes there is! you would need to do that instead:
    logger = prefect.context.get("logger")
    if "WARN" in output:
        <http://logger.info|logger.info>(output) # or put it into Slack task
    basically not iterating over each line of output would give you the entire output at once
    Madison Schott

    Madison Schott

    4 months ago
    when I just print the entire output I only get that last message
    I think I found the solution in the docs- I need to add
    return_all=True
    to my dbt task
    Anna Geller

    Anna Geller

    4 months ago
    that's true, I thought you have that already
    have you seen my dbt blog post series? I'm no big dbt user but pretty much most of what I know about orchestrating dbt DAGs is there, including getting logs from the runs - if you are interested it's here
    Madison Schott

    Madison Schott

    4 months ago
    cool thanks!