Hey, I am having trouble running a dbt prefect flo...
# ask-community
z
Hey, I am having trouble running a dbt prefect flow, and the documentation around this is very confusing, bordering on misleading and incomplete. Here is my flow
Copy code
import os
from prefect import flow, task
from prefect_dbt import DbtCliProfile, DbtCoreOperation, SnowflakeTargetConfigs
from prefect_snowflake.credentials import SnowflakeCredentials
from prefect_snowflake.database import SnowflakeConnector
from prefect.context import FlowRunContext

@flow
def trigger_dbt_data_build(client: str, warehouse: str = "COMPUTE_WH"):
    flow_run_name = FlowRunContext.get().flow_run.dict().get("name")
    credentials = SnowflakeCredentials(
        user=os.environ.get("SNOWFLAKE_USER"),
        password=os.environ.get("SNOWFLAKE_PASSWORD"),
        account=os.environ.get("SNOWFLAKE_ACCOUNT"),
        role="ACCOUNTADMIN",
    )
    connector = SnowflakeConnector(
        # schema=f"{client_name.upper()}_STG",
        schema="dbt_zmunro",
        threads=8,
        database="RAW",
        warehouse=warehouse,
        credentials=credentials,
        query_tag=f"dbt-data-build-{client}-{flow_run_name}",
    )
    target_configs = SnowflakeTargetConfigs(
        connector=connector,
        extras={
            "retry_on_database_errors": True,
            "connect_retries": 0,
            "connect_timeout": 600,
            "retry_all": False,
            "reuse_connections": False,
        },
    )
    dbt_cli_profile = DbtCliProfile(
        name="prefect-snowflake-dev",
        target="dev",
        target_configs=target_configs,
    )
    return DbtCoreOperation(
        commands=[
            f'dbt build --select +somemodel --vars \'{{"client_schema":"{client.upper()}_STG"}}\''
        ],
        dbt_cli_profile=dbt_cli_profile,
        overwrite_profiles=True,
        project_dir="/workflows/dbt/",
    ).run()
I am getting an error because the
DbtCoreOperation
cant find my
dbt_cli_profile
correctly. I am trying to follow the documentation here: https://prefecthq.github.io/prefect-dbt/cli/credentials/#prefect_dbt.cli.credentials That documentation doesn't say how to use this
dbt_cli_profile
object with the
DbtCoreOperation
function though. And I am confused as to how blocks relate to this all as well. Should I not be creating these credential objects each flow run and instead just do it once and save it in a block? where does this block get saved if I am not using prefect cloud?
👀 1
a
I know @Rob Freedy and @Mason Menges have been working pretty closely with our dbt integration?
z
It'd be great if they could help me with this. It is really not clear how to use the
DbtCliProfile
object with the
DbtCoreOperation
class. When I run the above code I get a runtime error saying
Could not find profile named 'ml-data-dev-snowflake'
That doesn't make any sense to me though, because I am creating a dbt_cli_profile and naming it, then passing it directly to the
DbtCoreOperation
class. So why would it be searching for a profile using the name specified? What else does it need? Shouldn't the
DbtCliProfile
object I've made have all the profile information needed to run the dbt command?
Do you need to use the
DbtCliProfile
object to actually write a
profiles.yml
file out before running the
DbtCoreOperation
??? Because that'd be super weird, ideally the core operation would just take the class, and if it is really needed to write that out to a file then that class would handle it. How blocks are related to this whole thing is a complete mystery to me also
I added my imports to the code in my original post because I thought maybe they were causing an issue
Okay, I figured it out. What was causing the issue was that profile name in the
dbt_project.yaml
needs to be set to the same name as the profile created by the
DbtCliProfile
object. Not clear on if that can be set programmatically
m
Brilliant took me a second to get to looking through this, Yeah the docs around this could definitely be more clear on exactly what you're looking for, The error actually makes sense then since the cli profile in the profiles yaml was different than the one specified in the block.
z
Okay but the concept of a "block" is still super unclear to me. What "block" are you referring to? When would I need to create a block, what is the benefit of creating one. I get they are meant for storing credentials but where do they get stored? Is it just meant for passing around credentials at run time in case multiple tasks need the dbtcliprofile object?
m
Blocks are, like you mentioned, configuration objects with some additional methods built in to these can be stored in your prefect Account to be referenced across multiple flows/tasks that and can be retrieved at run time, Not to say you can't instantiate them in code and use them that way but they're really meant to be reusable and build on top of each, thus the name blocks. for example the code you have above would look like this if the block objects were stored in prefect, essentially you'd build each block the same way you did in code and then reference the previous block as you were moving through the different objects, calling .save on those blocks in code also saves them to our api so they can be referenced later.
Copy code
import os
from prefect import flow, task
from prefect_dbt import DbtCliProfile, DbtCoreOperation, SnowflakeTargetConfigs
from prefect_snowflake.credentials import SnowflakeCredentials
from prefect_snowflake.database import SnowflakeConnector
from prefect.context import FlowRunContext

@flow
def trigger_dbt_data_build(client: str, warehouse: str = "COMPUTE_WH"):
    flow_run_name = FlowRunContext.get().flow_run.dict().get("name")
    dbt_cli_profile = DbtCliProfile.load("dbt-profile-block-name")

    return DbtCoreOperation(
        commands=[
            f'dbt build --select +somemodel --vars \'{{"client_schema":"{client.upper()}_STG"}}\''
        ],
        dbt_cli_profile=dbt_cli_profile,
        overwrite_profiles=True,
        project_dir="/workflows/dbt/",
    ).run()
z
So are blocks only a feature of prefect cloud?
m
Nope they're usable in OSS as well, you can load and save them provided you have a server instance setup
There's a lot to parse through in here but all the concepts around blocks here https://docs.prefect.io/latest/concepts/blocks/, still apply to OSS
What I mentioned before and what your original code is essentially doing without saving the blocks is explained under the nested blocks section