M. Siddiqui
12/27/2021, 12:46 PMKevin Kho
M. Siddiqui
12/27/2021, 2:46 PMKevin Kho
manfiest.json
into tasks. You can make a function similar to theirs:
def create_task(node, dbt_verb):
shell = ShellTask(...) # or use the dbt task
return shell
and then use it in your Flow:
with Flow(..) as flow:
dbt_tasks = {}
for node in data["nodes"].keys():
if node.split(".")[0] == "model":
node_test = node.replace("model", "test")
dbt_tasks[node] = create_task(node, "run")
and then you will have a bunch of dbt tasks, and then you need to set the upstream tasks which will be similar to their code, except you can do:
dbt_tasks[node].set_upstream(dbt_tasks[upstream_node])
M. Siddiqui
12/27/2021, 3:05 PMAram Panasenco
12/27/2021, 7:53 PMM. Siddiqui
12/28/2021, 10:32 AM