Dileep Damodaran
05/31/2022, 10:49 AMAnna Geller
05/31/2022, 11:24 AMDileep Damodaran
05/31/2022, 11:31 AMKevin Kho
05/31/2022, 2:27 PMMateo Merlo
05/31/2022, 3:16 PM@task
def get_param_env_value(target_env):
return f"{target_env}"
with Flow("DBT flow") as dbt_flow:
target_env = Parameter("target_env", required=True)
# dbt_run = dbt(command=format_command(target_env))
task_param_env = get_param_env_value(target_env)
pull_repo_task = pull_dbt_repo()
dbt_run = dbt(
command=format_command(target_env),
env={"GCP_BIGQUERY_DATASET": GCP_BQ_DATASET},
upstream_tasks=[pull_repo_task],
task_args={"name": "DBT Run", "environment": task_param_env},
)
dbt_run_out = print_dbt_output(dbt_run, task_args={"name": "DBT Run Output"})
@task
def get_param_env_value(param):
logger = prefect.context.get("logger")
parameters = prefect.context.get('parameters')
<http://logger.info|logger.info>(parameters[param])
return f"{parameters[param]}"
the printed value is dev that is ok but in my profiles.yml I still get !!python/object:prefect.tasks.core.function.FunctionTask instead of the dev stringKevin Kho
05/31/2022, 3:53 PMenvironment
does not. It’s the things that go in the task decorator like checkpoint
and trigger
get_param_env_value
. Can you show me how dbt()
was defined there?Mateo Merlo
05/31/2022, 3:56 PMdbt = DbtShellTask(
return_all=True,
profile_name="dbt_profile",
profiles_dir="dbt2",
overwrite_profiles=True,
log_stdout=True,
helper_script="cd dbt2",
log_stderr=True,
dbt_kwargs={
"dataset": 'dbt_staging',
"keyfile_json": {
"type": credentials['type'],
"project_id": credentials['project_id'],
"private_key_id": credentials['private_key_id'],
"private_key": credentials['private_key'],
"client_email": credentials['client_email'],
"client_id": credentials['client_id'],
"auth_uri": credentials['auth_uri'],
"token_uri": credentials['token_uri'],
"auth_provider_x509_cert_url": credentials['auth_provider_x509_cert_url'],
"client_x509_cert_url": credentials['client_x509_cert_url'],
},
"location": 'EU',
"method": "service-account-json",
"priority": "interactive",
"project": GCP_BQ_PROJECT,
"threads": 4,
"timeout_seconds": 300,
"type": "bigquery",
},
)
@task
def format_command(target_env):
return f"dbt run --profiles-dir . --target {target_env} "
@task
def get_param_env_value(param):
logger = prefect.context.get("logger")
parameters = prefect.context.get('parameters')
<http://logger.info|logger.info>(parameters[param])
return f"{parameters[param]}"
with Flow("DBT flow") as dbt_flow:
target_env = Parameter("target_env", required=True)
# dbt_run = dbt(command=format_command(target_env))
task_param_target_env = get_param_env_value('target_env')
pull_repo_task = pull_dbt_repo()
dbt_run = dbt(
command=format_command(target_env),
env={"GCP_BIGQUERY_DATASET": GCP_BQ_DATASET},
upstream_tasks=[pull_repo_task],
task_args={"name": "DBT Run", "environment": task_param_target_env},
)
dbt_run_out = print_dbt_output(dbt_run, task_args={"name": "DBT Run Output"})
# send_slack_alert_on_failure_dbt(dbt_run)
dbt_flow.set_reference_tasks([dbt_run])
task_args={"name": "DBT Run", "environment": task_param_target_env},
and use this instead:
task_args={"name": "DBT Run", "environment": "dev"},
it worksKevin Kho
05/31/2022, 4:00 PMdbt()
dbt_run = dbt(...,environment=target_env)
Mateo Merlo
05/31/2022, 4:04 PMwith Flow("DBT flow") as dbt_flow:
target_env = Parameter("target_env", required=True)
# dbt_run = dbt(command=format_command(target_env))
# task_param_target_env = get_param_env_value('target_env')
pull_repo_task = pull_dbt_repo()
dbt_run = dbt(
command=format_command(target_env),
env={"GCP_BIGQUERY_DATASET": GCP_BQ_DATASET},
upstream_tasks=[pull_repo_task],
environment=target_env,
task_args={"name": "DBT Run"},
)
dbt_run_out = print_dbt_output(dbt_run, task_args={"name": "DBT Run Output"})
Kevin Kho
05/31/2022, 4:07 PMDbtShellTask
to take in environment
in the runMateo Merlo
05/31/2022, 4:12 PMpull_repo_task = pull_dbt_repo()
dbt_run = dbt(
command=format_command(target_env),
env={"GCP_BIGQUERY_DATASET": GCP_BQ_DATASET},
upstream_tasks=[pull_repo_task],
task_args={"name": "DBT Run", "environment": "dev"},
)
worksKevin Kho
05/31/2022, 4:17 PMtask_args
there is evaluated during build time, but Parameters have values during run time. That’s why you get the weird looking Parameter
class instead as your environment. So when task_args
runs, the environment part is not knownMateo Merlo
05/31/2022, 4:19 PMKevin Kho
05/31/2022, 4:22 PMclass MyDbtShellTask(DbtShellTask):
def run(self, environment, ...):
self.environment = environment
super.run(other_kwargs_here)
it will be something like that and then you use MyDbtShellTask
insteadMateo Merlo
05/31/2022, 4:29 PMDbtShellTask
with different environments (I only need "dev" and "prod") and using case method:
@task
def format_command(target_env):
return f"dbt run --profiles-dir . --target {target_env} "
@task
def is_target_dev(target_env):
return target_env == "dev"
with Flow("DBT flow") as dbt_flow:
target_env = Parameter("target_env", required=True)
is_dev_environment = is_target_dev(target_env)
pull_repo_task = pull_dbt_repo()
with case(is_dev_environment, True):
dbt_run = dbt_dev(
command=format_command(target_env),
env={"GCP_BIGQUERY_DATASET": GCP_BQ_DATASET},
upstream_tasks=[pull_repo_task],
task_args={"name": "DBT Run STAGING"},
)
with case(is_dev_environment, False):
dbt_run = dbt_prod(
command=format_command(target_env),
env={"GCP_BIGQUERY_DATASET": GCP_BQ_DATASET},
upstream_tasks=[pull_repo_task],
task_args={"name": "DBT Run PROD"},
)
dbt_run_out = print_dbt_output(dbt_run, task_args={"name": "DBT Run Output"})
Kevin Kho
05/31/2022, 5:30 PMMateo Merlo
05/31/2022, 6:32 PMDileep Damodaran
06/01/2022, 7:31 AMKevin Kho
06/01/2022, 2:21 PMparameter.run()
does not help in any use case