I want to run the same DbtShellTask on two differe...
# prefect-community
a
I want to run the same DbtShellTask on two different schemas in one flow. Is there an efficient way to do this? I am currently doing this and it feels chunky and repetitive:
code snippet moved to thread
The two DbtShellTasks are identical except for the schema. I tried defining the DbtShellTask without schema outside of my flow and tried a few different things inside my flow:
transform_task.run(dbt_kwargs = {"schema": "schema_1"})
but I always get the same error:
Copy code
ValueError: Could not infer an active Flow context while creating edge to <Task: DbtShellTask>. This often means you called a task outside a `with Flow(...)` block. If you're trying to run this task outside of a Flow context, you need to call `DbtShellTask(...).run(...)`
I can tell I am doing something silly but can't seem to resolve it πŸ˜…
w
Hi! I totally don’t know the answer but I am in the same boat as you! I am trying to use a different command=β€œβ€ for different clocks. Maybe this might work for you?
Copy code
dbt = DbtShellTask(
    return_all=True,
    profile_name="snowflake",
    environment="prod", #The env, change to dev if you are trying to do it locally
    profiles_dir=Secret("DBT_PROFILE_DIR_T2").get(), #Change to the local dir where your profiles.yml is located
    overwrite_profiles=False,
    log_stdout=True,
    helper_script="cd dbt",
    log_stderr=True,
)

@task(name="DBT run", nout=2)
def dbt_run(clock, dbt_command):
    if clock == "full_run" and dbt_command == "orders":
        <http://logger.info|logger.info>("DBT run all")
        run = dbt.run(
            command=f"dbt run",
        )
    else:
        <http://logger.info|logger.info>("DBT run select")
        run = dbt.run(
            command=f"dbt run -s {dbt_command}",
        )
    
    return run, dbt_command

@task(name="DBT test")
def dbt_test(clock, dbt_command):
    if clock == "full_run" and dbt_command == "orders":
        <http://logger.info|logger.info>("DBT test all")
        run = dbt.run(
            command=f"dbt test",
        )
    else:
        <http://logger.info|logger.info>("DBT test select")
        run = dbt.run(
            command=f"dbt test -s {dbt_command}",
        )
    
    return run
In my flow: this is how I am calling the tasks.
Copy code
run = dbt_run(clock=clock, dbt_command=dbt_command)
test = dbt_test(clock=clock, dbt_command=dbt_command)
a
oooh, thank you! I'll try something similar and see if it works for me.
k
Hey Apoorva, could you move the first code snippet to the thread when you get the chance to keep the main channel a bit more compact?
βœ… 1
a
Sure:
Copy code
transform_task_schema_1 = DbtShellTask(
            profile_name="default",
            environment="prod",
            dbt_kwargs={
                "type": "snowflake",
                "threads": 10,
                "account": "xxx",
                "role": "xxx",
                "database": "xxx",
                "warehouse": "xxx",
                "schema": "schema_1",
                "user": "xxx",
                "password": xxx,
            },
            overwrite_profiles=True,
            profiles_dir="/root/.dbt/",
            helper_script=('cd /root/repo'),
            return_all=True,
            log_stderr=True
        )(command="dbt seed")

    transform_task_schema_1 = DbtShellTask(
            profile_name="default",
            environment="prod",
            dbt_kwargs={
                "type": "snowflake",
                "threads": 10,
                "account": "xxx",
                "role": "xxx",
                "database": "xxx",
                "warehouse": "xxx",
                "schema": "schema_2",
                "user": "xxx",
                "password": xxx,
            },
            overwrite_profiles=True,
            profiles_dir="/root/.dbt/",
            helper_script=('cd /root/repo'),
            return_all=True,
            log_stderr=True
        )(command="dbt seed")
@Kevin Kho Wei Mei's solution didn't really work for me
Now I am trying this inside `with Flow`:
Copy code
transform_task = DbtShellTask(
            profile_name="default",
            environment="prod",
            dbt_kwargs={
                "type": "snowflake",
                "threads": 10,
                "account": "xxx",
                "role": "xxx",
                "database": "xxx",
                "warehouse": "xxx",
                "schema": "schema_1",
                "user": "xxx",
                "password": xxx,
            },
            overwrite_profiles=True,
            profiles_dir="/root/.dbt/",
            helper_script=('cd /root/repo'),
            return_all=True,
            log_stderr=True
        )
    transform_task_1=transform_task.run(dbt_kwargs = {"schema": "schema_1"}, command="dbt seed")
    transform_task_2=transform_task.run(dbt_kwargs = {"schema": "schema_2"}, command="dbt seed")
and I get a new error:
PermissionError: [Errno 13] Permission denied: '/root/.dbt/profiles.yml'
@Kevin Kho Is it even possible to do what I am trying to do?
k
I would do:
Copy code
transform_task_schema_1 = DbtShellTask(
            profile_name="default",
            environment="prod",
            dbt_kwargs={
                "type": "snowflake",
                "threads": 10,
                "account": "xxx",
                "role": "xxx",
                "database": "xxx",
                "warehouse": "xxx",
                "schema": "schema_1",
                "user": "xxx",
                "password": xxx,
            },
            overwrite_profiles=True,
            profiles_dir="/root/.dbt/",
            helper_script=('cd /root/repo'),
            return_all=True,
            log_stderr=True
        )
transform_task_schema_2 = copy.deepcopy(transform_task_scheme_1)
transform_task_schema_2.dbt_kwargs = {new_schema_here}

with Flow() as flow:
    transform_task_1 = transform_task_schema_1(command="dbt seed")
    transform_task_2 = transform_task_schema_2(command="dbt seed")
I think that would work. Like make a copy and edit the class itself.
a
ah okay, let me try that. brb
The second dbtshelltask fails with:
Copy code
Task 'DbtShellTask': Exception encountered during task execution!
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 880, in get_task_run_state
    value = prefect.utilities.executors.run_task_with_timeout(
  File "/usr/local/lib/python3.8/site-packages/prefect/utilities/executors.py", line 468, in run_task_with_timeout
    return task.run(*args, **kwargs)  # type: ignore
  File "/usr/local/lib/python3.8/site-packages/prefect/utilities/tasks.py", line 456, in method
    return run_method(self, *args, **kwargs)
  File "/usr/local/lib/python3.8/site-packages/prefect/tasks/dbt/dbt.py", line 164, in run
    dbt_kwargs = {**self.dbt_kwargs, **(dbt_kwargs or {})}
TypeError: 'set' object is not a mapping
transform_task_schema_2.dbt_kwargs = {new_schema_here}
needs to be
transform_task_schema_2.dbt_kwargs = {"schema" : new_schema_here}
I think. After doing that:
Copy code
22:40:32  Encountered an error while reading profiles:
22:40:32    ERROR: Runtime Error
  required field "type" not found in profile default and target prod
22:40:32  Defined profiles:
22:40:32   - default
22:40:32
For more information on configuring profiles, please consult the dbt docs:

<https://docs.getdbt.com/docs/configure-your-profile>

22:40:32  Encountered an error:
Runtime Error
  Could not run dbt
@Kevin Kho
k
I think you need to pass the entire dbt kwargs dict. Not just the schema
a
That worked
thanks