Apoorva Desai
06/28/2022, 8:59 PMcode snippet moved to thread
The two DbtShellTasks are identical except for the schema. I tried defining the DbtShellTask without schema outside of my flow and tried a few different things inside my flow:
transform_task.run(dbt_kwargs = {"schema": "schema_1"})
but I always get the same error:
ValueError: Could not infer an active Flow context while creating edge to <Task: DbtShellTask>. This often means you called a task outside a `with Flow(...)` block. If you're trying to run this task outside of a Flow context, you need to call `DbtShellTask(...).run(...)`
I can tell I am doing something silly but can't seem to resolve it π
Wei Mei
06/28/2022, 9:18 PMdbt = DbtShellTask(
return_all=True,
profile_name="snowflake",
environment="prod", #The env, change to dev if you are trying to do it locally
profiles_dir=Secret("DBT_PROFILE_DIR_T2").get(), #Change to the local dir where your profiles.yml is located
overwrite_profiles=False,
log_stdout=True,
helper_script="cd dbt",
log_stderr=True,
)
@task(name="DBT run", nout=2)
def dbt_run(clock, dbt_command):
if clock == "full_run" and dbt_command == "orders":
<http://logger.info|logger.info>("DBT run all")
run = dbt.run(
command=f"dbt run",
)
else:
<http://logger.info|logger.info>("DBT run select")
run = dbt.run(
command=f"dbt run -s {dbt_command}",
)
return run, dbt_command
@task(name="DBT test")
def dbt_test(clock, dbt_command):
if clock == "full_run" and dbt_command == "orders":
<http://logger.info|logger.info>("DBT test all")
run = dbt.run(
command=f"dbt test",
)
else:
<http://logger.info|logger.info>("DBT test select")
run = dbt.run(
command=f"dbt test -s {dbt_command}",
)
return run
run = dbt_run(clock=clock, dbt_command=dbt_command)
test = dbt_test(clock=clock, dbt_command=dbt_command)
Apoorva Desai
06/28/2022, 9:37 PMKevin Kho
06/28/2022, 9:56 PMApoorva Desai
06/28/2022, 9:57 PMtransform_task_schema_1 = DbtShellTask(
profile_name="default",
environment="prod",
dbt_kwargs={
"type": "snowflake",
"threads": 10,
"account": "xxx",
"role": "xxx",
"database": "xxx",
"warehouse": "xxx",
"schema": "schema_1",
"user": "xxx",
"password": xxx,
},
overwrite_profiles=True,
profiles_dir="/root/.dbt/",
helper_script=('cd /root/repo'),
return_all=True,
log_stderr=True
)(command="dbt seed")
transform_task_schema_1 = DbtShellTask(
profile_name="default",
environment="prod",
dbt_kwargs={
"type": "snowflake",
"threads": 10,
"account": "xxx",
"role": "xxx",
"database": "xxx",
"warehouse": "xxx",
"schema": "schema_2",
"user": "xxx",
"password": xxx,
},
overwrite_profiles=True,
profiles_dir="/root/.dbt/",
helper_script=('cd /root/repo'),
return_all=True,
log_stderr=True
)(command="dbt seed")
transform_task = DbtShellTask(
profile_name="default",
environment="prod",
dbt_kwargs={
"type": "snowflake",
"threads": 10,
"account": "xxx",
"role": "xxx",
"database": "xxx",
"warehouse": "xxx",
"schema": "schema_1",
"user": "xxx",
"password": xxx,
},
overwrite_profiles=True,
profiles_dir="/root/.dbt/",
helper_script=('cd /root/repo'),
return_all=True,
log_stderr=True
)
transform_task_1=transform_task.run(dbt_kwargs = {"schema": "schema_1"}, command="dbt seed")
transform_task_2=transform_task.run(dbt_kwargs = {"schema": "schema_2"}, command="dbt seed")
and I get a new error:
PermissionError: [Errno 13] Permission denied: '/root/.dbt/profiles.yml'
Kevin Kho
06/28/2022, 10:00 PMtransform_task_schema_1 = DbtShellTask(
profile_name="default",
environment="prod",
dbt_kwargs={
"type": "snowflake",
"threads": 10,
"account": "xxx",
"role": "xxx",
"database": "xxx",
"warehouse": "xxx",
"schema": "schema_1",
"user": "xxx",
"password": xxx,
},
overwrite_profiles=True,
profiles_dir="/root/.dbt/",
helper_script=('cd /root/repo'),
return_all=True,
log_stderr=True
)
transform_task_schema_2 = copy.deepcopy(transform_task_scheme_1)
transform_task_schema_2.dbt_kwargs = {new_schema_here}
with Flow() as flow:
transform_task_1 = transform_task_schema_1(command="dbt seed")
transform_task_2 = transform_task_schema_2(command="dbt seed")
Apoorva Desai
06/28/2022, 10:01 PMTask 'DbtShellTask': Exception encountered during task execution!
Traceback (most recent call last):
File "/usr/local/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 880, in get_task_run_state
value = prefect.utilities.executors.run_task_with_timeout(
File "/usr/local/lib/python3.8/site-packages/prefect/utilities/executors.py", line 468, in run_task_with_timeout
return task.run(*args, **kwargs) # type: ignore
File "/usr/local/lib/python3.8/site-packages/prefect/utilities/tasks.py", line 456, in method
return run_method(self, *args, **kwargs)
File "/usr/local/lib/python3.8/site-packages/prefect/tasks/dbt/dbt.py", line 164, in run
dbt_kwargs = {**self.dbt_kwargs, **(dbt_kwargs or {})}
TypeError: 'set' object is not a mapping
transform_task_schema_2.dbt_kwargs = {new_schema_here}
needs to be transform_task_schema_2.dbt_kwargs = {"schema" : new_schema_here}
I think. After doing that:
22:40:32 Encountered an error while reading profiles:
22:40:32 ERROR: Runtime Error
required field "type" not found in profile default and target prod
22:40:32 Defined profiles:
22:40:32 - default
22:40:32
For more information on configuring profiles, please consult the dbt docs:
<https://docs.getdbt.com/docs/configure-your-profile>
22:40:32 Encountered an error:
Runtime Error
Could not run dbt
@Kevin KhoKevin Kho
06/28/2022, 10:51 PMApoorva Desai
06/28/2022, 10:59 PM