matta
02/18/2021, 7:30 PM@task(trigger=all_finished)
def get_filepaths(dbt_path: str = "/root/dbt-repo/target") -> t.List[pathlib.PosixPath]:
return list(pathlib.Path(f"{dbt_path}/compiled").rglob("*.sql"))
def make_query_name(path: pathlib.PosixPath) -> str:
query_name = str(path).split("/compiled")[1]
return f"# {query_name}"
def make_sql_markdown(sql: str) -> str:
return f"
sql\n{sql}\n```"
@task(trigger=all_finished)
def publish_artifact(filepaths: t.List[pathlib.PosixPath]) -> None:
titles_and_queries = [
"\n".join((make_query_name(path), make_sql_markdown(path.read_text())))
for path in filepaths
]
all_merged = "\n\n".join(titles_and_queries)
create_markdown(all_merged)```George Coyne
02/18/2021, 7:33 PMDylan
02/18/2021, 7:35 PMale
02/18/2021, 7:48 PM