Zach Munro
03/27/2024, 5:28 PMimport os
from prefect import flow, task
from prefect_dbt import DbtCliProfile, DbtCoreOperation, SnowflakeTargetConfigs
from prefect_snowflake.credentials import SnowflakeCredentials
from prefect_snowflake.database import SnowflakeConnector
from prefect.context import FlowRunContext
@flow
def trigger_dbt_data_build(client: str, warehouse: str = "COMPUTE_WH"):
flow_run_name = FlowRunContext.get().flow_run.dict().get("name")
credentials = SnowflakeCredentials(
user=os.environ.get("SNOWFLAKE_USER"),
password=os.environ.get("SNOWFLAKE_PASSWORD"),
account=os.environ.get("SNOWFLAKE_ACCOUNT"),
role="ACCOUNTADMIN",
)
connector = SnowflakeConnector(
# schema=f"{client_name.upper()}_STG",
schema="dbt_zmunro",
threads=8,
database="RAW",
warehouse=warehouse,
credentials=credentials,
query_tag=f"dbt-data-build-{client}-{flow_run_name}",
)
target_configs = SnowflakeTargetConfigs(
connector=connector,
extras={
"retry_on_database_errors": True,
"connect_retries": 0,
"connect_timeout": 600,
"retry_all": False,
"reuse_connections": False,
},
)
dbt_cli_profile = DbtCliProfile(
name="prefect-snowflake-dev",
target="dev",
target_configs=target_configs,
)
return DbtCoreOperation(
commands=[
f'dbt build --select +somemodel --vars \'{{"client_schema":"{client.upper()}_STG"}}\''
],
dbt_cli_profile=dbt_cli_profile,
overwrite_profiles=True,
project_dir="/workflows/dbt/",
).run()
I am getting an error because the DbtCoreOperation
cant find my dbt_cli_profile
correctly. I am trying to follow the documentation here:
https://prefecthq.github.io/prefect-dbt/cli/credentials/#prefect_dbt.cli.credentials
That documentation doesn't say how to use this dbt_cli_profile
object with the DbtCoreOperation
function though. And I am confused as to how blocks relate to this all as well. Should I not be creating these credential objects each flow run and instead just do it once and save it in a block? where does this block get saved if I am not using prefect cloud?Alexander Azzam
03/27/2024, 5:37 PMZach Munro
03/27/2024, 5:40 PMDbtCliProfile
object with the DbtCoreOperation
class. When I run the above code I get a runtime error saying Could not find profile named 'ml-data-dev-snowflake'
Zach Munro
03/27/2024, 5:41 PMDbtCoreOperation
class. So why would it be searching for a profile using the name specified? What else does it need? Shouldn't the DbtCliProfile
object I've made have all the profile information needed to run the dbt command?Zach Munro
03/27/2024, 5:48 PMDbtCliProfile
object to actually write a profiles.yml
file out before running the DbtCoreOperation
??? Because that'd be super weird, ideally the core operation would just take the class, and if it is really needed to write that out to a file then that class would handle it.
How blocks are related to this whole thing is a complete mystery to me alsoZach Munro
03/27/2024, 6:04 PMZach Munro
03/27/2024, 6:16 PMdbt_project.yaml
needs to be set to the same name as the profile created by the DbtCliProfile
object. Not clear on if that can be set programmaticallyMason Menges
03/27/2024, 6:18 PMZach Munro
03/27/2024, 6:20 PMMason Menges
03/27/2024, 6:34 PMimport os
from prefect import flow, task
from prefect_dbt import DbtCliProfile, DbtCoreOperation, SnowflakeTargetConfigs
from prefect_snowflake.credentials import SnowflakeCredentials
from prefect_snowflake.database import SnowflakeConnector
from prefect.context import FlowRunContext
@flow
def trigger_dbt_data_build(client: str, warehouse: str = "COMPUTE_WH"):
flow_run_name = FlowRunContext.get().flow_run.dict().get("name")
dbt_cli_profile = DbtCliProfile.load("dbt-profile-block-name")
return DbtCoreOperation(
commands=[
f'dbt build --select +somemodel --vars \'{{"client_schema":"{client.upper()}_STG"}}\''
],
dbt_cli_profile=dbt_cli_profile,
overwrite_profiles=True,
project_dir="/workflows/dbt/",
).run()
Zach Munro
03/27/2024, 6:53 PMMason Menges
03/27/2024, 7:01 PMMason Menges
03/27/2024, 7:03 PMMason Menges
03/27/2024, 7:04 PM