Ken Nguyen
04/14/2022, 11:28 PMKevin Kho
Ken Nguyen
04/14/2022, 11:29 PMValueError: 'dbt' exists and is not an empty directory
Kevin Kho
return_all=True
maybe for DbtShellTask
? But I think what you have should be enough. What is your executor for this Flow? That affects if the tasks run on separate environments, but LocalExecutor and LocalDask will share the same filesystempygit2
clone_repository takes in a local path. I think you can just use that to rename the destination folder?Ken Nguyen
04/14/2022, 11:38 PMKevin Kho
Ken Nguyen
04/15/2022, 12:15 AM@task(log_stdout=True)
def run_dbt(schema_name, branch_name, command):
DbtShellTask(
return_all=True,
profile_name="drsquatch_dev",
environment="prod",
overwrite_profiles=True,
log_stdout=True,
helper_script="cd {}/dbt".format(branch_name),
log_stderr=True,
stream_output=True,
dbt_kwargs={
"type": "snowflake",
"account": secret['SNOWFLAKE_ACCOUNT'],
"user": secret['SNOWFLAKE_USERNAME'],
"password": secret['SNOWFLAKE_PASSWORD'],
"role": "PREFECT_DATA_QUAL_TRCK_ROLE",
"database": "ANALYTICS_DEV",
"warehouse": secret['SNOWFLAKE_WAREHOUSE'],
"schema": schema_name,
"threads": 12,
"client_session_keep_alive": False,
},
)
##============
log_stdout=True, log_stderr=True, stream_output=True
, do you see any reason why I wouldn’t be seeing logs? I’m unsure if it’s a mapped run thing or a DbtShellTask wrapped in another task thingKevin Kho
Ken Nguyen
04/15/2022, 12:22 AMKevin Kho
Ken Nguyen
04/15/2022, 12:38 AMKevin Kho
Ken Nguyen
04/15/2022, 12:42 AMKevin Kho
class MyDbtShellTask(DbtShellTask):
@defaults_from_attrs("command", "env", "helper_script", "dbt_kwargs")
def run(
self,
command: str = None,
env: dict = None,
helper_script: str = None,
dbt_kwargs: dict = None,
path_to_clone: str = None
) -> str:
pygit2.clone_repostiry(..,path_to_clone,..)
super().run(command, env, helper_script, dbt_kwargs)
path_to_clone
for example before feeding the rest of the kwargs to DbtShellTask.run()
by using the super()
Ken Nguyen
04/15/2022, 12:52 AMKevin Kho
super().run()
at the endKen Nguyen
04/15/2022, 1:17 AMKevin Kho
Ken Nguyen
04/15/2022, 2:13 AM@defaults_from_attrs
) -> str:
I also don’t quite understand how MyDbtShellTask is able take in new inputs
edit: found this to read about the decorator https://docs.prefect.io/api/latest/utilities/tasks.html@task(log_stdout=True)
def run_dbt(schema_name, branch_name, command):
DbtShellTask(
return_all=True,
profile_name="drsquatch_dev",
environment="prod",
overwrite_profiles=True,
log_stdout=True,
helper_script="cd {}/dbt".format(branch_name),
log_stderr=True,
stream_output=True,
dbt_kwargs={
"type": "snowflake",
"account": secret['SNOWFLAKE_ACCOUNT'],
"user": secret['SNOWFLAKE_USERNAME'],
"password": secret['SNOWFLAKE_PASSWORD'],
"role": "PREFECT_DATA_QUAL_TRCK_ROLE",
"database": "ANALYTICS_DEV",
"warehouse": secret['SNOWFLAKE_WAREHOUSE'],
"schema": schema_name,
"threads": 12,
"client_session_keep_alive": False,
},
)
It’s able to take in branch_name, and format that into the helper_script parameter. And then it’s able to take in schema_name, and add the value to the dbt_kwargs dictionary . I can’t quite understand how to emulate the same thing when subclassing DbtShellTaskKevin Kho
pg = PostgresExecute(db=...,pw=...)
with Flow(...) as flow:
pg()
and then pg() will use the init kwargsKen Nguyen
04/15/2022, 2:48 AMKevin Kho
run
as additional inputs will suffice. then you dont need them in default_from_attrs
because there is no default from the initsuper().run()
. The DbtShellTask
has a ton of logic and then it calls the ShellTask.run()
at the endKen Nguyen
04/15/2022, 3:00 AMKevin Kho
schema_name
with some logic and we want to inject that in dbt_kwargs
and that motivates our new task
class MyDbtShellTask(DbtShellTask):
def run(command=None, env=None, helper_script=None, dbt_kwargs=None, schema_name_suffix=str):
dbt_kwargs['schema_name'] = dbt_kwargs['schema_name'] + schema_name_suffix
super().run(command, env, helper_script, dbt_kwargs)
Ken Nguyen
04/15/2022, 3:09 AMKevin Kho
Ken Nguyen
04/15/2022, 5:25 PMKevin Kho
Ken Nguyen
04/15/2022, 5:50 PMKevin Kho
flow.executor = LocalDaskExecutor()
and it should run in parallelKen Nguyen
04/16/2022, 12:26 AMwith Flow("data-quality-tracking-parallel-dbt-run-flow", run_config=RUN_CONFIG, storage=STORAGE, executor=LocalDaskExecutor()) as flow:
branch_name = Parameter('branch_name', default=["branch1", "branch2"], required=False)
schema_name = Parameter('schema_name', default=["schema1", "schema2"], required=False)
Do parallel runs take elements from each parameters at the same index? In other words, will flow runs always have branch1
be ran with schema1
, or is it random and I might get branch1
be ran with schema2
as well?Kevin Kho
Ken Nguyen
04/16/2022, 10:58 PMtable model *heap_change*.subscription_renewals
and the other should say table model *heap_stage*.subscription_renewals
. But as you can see, each task in both flows seem to just pick 1, and the picking seems randomKevin Kho
Ken Nguyen
04/16/2022, 11:05 PM##=========================================##
##================ CONFIGS ================##
##=========================================##
import prefect
from prefect import task, Flow, Parameter, unmapped
from prefect.utilities.tasks import defaults_from_attrs
from prefect.executors import LocalDaskExecutor
from prefect.storage import GitHub
from prefect.run_configs import ECSRun
from prefect.client import Secret
import pygit2
STORAGE = GitHub(repo="dr-squatch/prefect",
path="/flows/data_quality_tracking/data_quality_tracking_parallel_dbt_run_flow.py",
access_token_secret="GITHUB_ACCESS_TOKEN")
RUN_CONFIG = ECSRun(labels=['dev'],
task_role_arn=Secret("ECS_TASK_ROLE_ARN").get(),
execution_role_arn=Secret("ECS_EXECUTION_ROLE_ARN").get(),
image="kendrsq/prefect-dbt:latest")
logger = prefect.context.get("logger")
##=========================================##
##=========================================##
##=========================================##
##=========== PULL_DBT_REPO TASK ==========##
##=========================================##
@task(name="Clone DBT")
def pull_dbt_repo(branch_name, schema_name):
git_token = Secret("GITHUB_ACCESS_TOKEN").get()
dbt_repo_name = "analytics"
dbt_repo_https = (
f"https://{git_token}:x-oauth-basic@github.com/dr-squatch/{dbt_repo_name}"
)
dbt_repo = pygit2.clone_repository(dbt_repo_https, branch_name)
branch = dbt_repo.branches['origin/' + branch_name]
# <http://logger.info|logger.info>("Pulling the {} branch from analytics repo".format(branch_name))
<http://logger.info|logger.info>("[PULL_REPO] Running dbt task from the {} branch, into the ANALYTICS_DEV.{} schema".format(branch_name, schema_name))
ref = dbt_repo.lookup_reference(branch.name)
dbt_repo.checkout(ref)
<http://logger.info|logger.info>("Cloning of the {} branch complete".format(branch_name))
##=========================================##
##=========================================##
##=========================================##
##=========== DBTSHELLTASK SETUP ==========##
##=========================================##
from prefect.tasks.dbt.dbt import DbtShellTask
secret = Secret("PREFECT_SNOWFLAKE_DETAILS").get()
class MyDbtShellTask(DbtShellTask):
@defaults_from_attrs("command", "env", "helper_script", "dbt_kwargs")
def run(
self,
command: str = None,
env: dict = None,
helper_script: str = None,
dbt_kwargs: dict = None,
branch_name: str = None,
schema_name: str = None,
# Above 2 arguments are additional arguments to be added on top of regular DbtShellTask arguments
) -> str:
helper_script = "cd {}/dbt".format(branch_name)
dbt_kwargs['schema'] = schema_name
<http://logger.info|logger.info>("[DBT] Running dbt task from the {} branch, into the ANALYTICS_DEV.{} schema".format(branch_name, schema_name))
super().run(command=command, env=env, helper_script=helper_script, dbt_kwargs=dbt_kwargs)
run_dbt = MyDbtShellTask(
return_all=True,
profile_name="drsquatch_dev",
environment="prod",
overwrite_profiles=True,
log_stdout=True,
log_stderr=True,
stream_output=True,
dbt_kwargs={
"type": "snowflake",
"account": secret['SNOWFLAKE_ACCOUNT'],
"user": secret['SNOWFLAKE_USERNAME'],
"password": secret['SNOWFLAKE_PASSWORD'],
"role": "PREFECT_DATA_QUAL_TRCK_ROLE",
"database": "ANALYTICS_DEV",
"warehouse": secret['SNOWFLAKE_WAREHOUSE'],
"threads": 12,
"client_session_keep_alive": False,
},
)
##=========================================##
##=========================================##
##=========================================##
##============= OUTPUT_PRINT ==============##
##=========================================##
@task
def output_print(output):
<http://logger.info|logger.info>(output)
##=========================================##
##=========================================##
##=========================================##
##================= FLOW ==================##
##=========================================##
with Flow("data-quality-tracking-parallel-dbt-run-flow", run_config=RUN_CONFIG, storage=STORAGE, executor=LocalDaskExecutor()) as flow:
branch_name = Parameter('branch_name', default=["parallel_testing_branch", "parallel_testing_branch2"], required=False)
schema_name = Parameter('schema_name', default=["heap_stage", "heap_change"], required=False)
dbt_command = Parameter('dbt_command', default="dbt run -m subscriptions")
# output_print.map("The inputted branch is {} and the inputted schema is {}".format(branch_name, schema_name))
pull_repo = pull_dbt_repo.map(
schema_name=schema_name,
branch_name=branch_name
)
deps = run_dbt.map(
schema_name=schema_name,
branch_name=branch_name,
command=unmapped("dbt deps"),
task_args={"name": "DBT: Dependencies"},
upstream_tasks=[pull_repo],
)
run_model = run_dbt.map(
schema_name=schema_name,
branch_name=branch_name,
command=unmapped(dbt_command),
task_args={"name": "DBT: Run model"},
upstream_tasks=[deps],
)
##=========================================##
##=========================================##
flow.register(project_name="data_quality_tracking")
Kevin Kho
upstream_tasks=pull_repo
with no bracket?Ken Nguyen
04/16/2022, 11:10 PMupstream_tasks=deps
without brackets as well?Kevin Kho
Ken Nguyen
04/16/2022, 11:12 PMfor t in upstream_tasks or []:
File "/Users/kennguyen/Documents/prefect/master_data_qual_env/lib/python3.9/site-packages/prefect/core/task.py", line 998, in __iter__
raise TypeError(
TypeError: Task is not iterable. If your task returns multiple results, pass `nout` to the task decorator/constructor, or provide a `Tuple` return-type annotation to your task.
Kevin Kho
dbt_kwargs
, but i am not seeing a second passed in?Ken Nguyen
04/16/2022, 11:15 PMclass MyDbtShellTask(DbtShellTask):
, I got a dbt_kwargs['schema'] = schema_name
in thereKevin Kho
Ken Nguyen
04/16/2022, 11:21 PMKevin Kho
deps = run_dbt.map(
schema_name=schema_name,
branch_name=branch_name,
command=unmapped("dbt deps"),
upstream_tasks=pull_repo,
)
Ken Nguyen
04/16/2022, 11:24 PMKevin Kho
Ken Nguyen
04/16/2022, 11:29 PMKevin Kho