Hi all, im trying to map dbt commands to a task, r...
# prefect-server
z
Hi all, im trying to map dbt commands to a task, run_dbt_command, however i keep getting this error, Could not infer an active Flow context while creating edge to <Task: dbt run tag:>. This often means you called a task outside a
with Flow(...)
block. If you're trying to run this task outside of a Flow context, you need to call `DbtShellTask(...).run(...)
. My code is below. Any ideas what's wrong? Thank you!
k
Hey @Zhibin Dai, could you move the code to the thread to make the channel cleaner? I think the issue here if you are using a task in a task:
Copy code
@task
def run_dbt_command(command, dbt_tag, dbt_kwargs):
	logger = prefect.context.get("logger")

	command_to_run = f"{command['command']}{dbt_tag}" if 'tag:' in command['command'] else command['command']	
	# upstream_tasks = [upstream_task] if len(reference_tasks) == 0 else reference_tasks

	run_cmd = dbt(
				command=command_to_run,
				task_args=command['task_args'],
				dbt_kwargs=dbt_kwargs
	)

	return run_cmd
You can’t use
dbt
but you can do
dbt.run()
z
Copy code
dbt = DbtShellTask(
    profile_name=DBT_PROJECT,
    profiles_dir="./",
   	environment="dev",
    log_stdout=True,
    log_stderr=True,
    return_all=True,
    helper_script=f"cd {DBT_REPO}",
    dbt_kwargs={
        "type": "snowflake",
        "client_session_keep_alive": False
    }
)

@task
def run_dbt_command(command, dbt_tag, dbt_kwargs):
	logger = prefect.context.get("logger")

	command_to_run = f"{command['command']}{dbt_tag}" if 'tag:' in command['command'] else command['command']	
	# upstream_tasks = [upstream_task] if len(reference_tasks) == 0 else reference_tasks

	run_cmd = dbt(
				command=command_to_run,
				task_args=command['task_args'],
				dbt_kwargs=dbt_kwargs
	)

	return run_cmd


with Flow(FLOW_NAME, storage=STORAGE, executor=LocalExecutor(), run_config=LocalRun(labels=["dev"]),) as flow:
	snowflake_user = PrefectSecret("SF_USER")
	snowflake_pass = PrefectSecret("SF_PASS")
	snowflake_role = PrefectSecret("SF_ROLE")
	snowflake_accid = PrefectSecret("SF_ACCTID")
	git_token = PrefectSecret(GITHUB_ACCESS_TOKEN)

	env = Parameter("env", default="dev")
	dbt_repo = Parameter("dbt_repo_url", default=DBT_REPO)
	dbt_config_url = Parameter('dbt_config_url')
	dbt_tag = Parameter('dbt_tag')
	
	del_repo = delete_folder(DBT_REPO)

	dbt_repo_task = pull_git_repo(
        git_token=git_token,
        repo_name=dbt_repo,
        checkout_branch="dbt_run_configs"
	)

	dbt_config_task = pull_git_json_config_file(
            url=dbt_config_url,
            git_token=git_token,
            task_args={'trigger': all_successful},
   	        upstream_tasks=[dbt_repo_task]
    )

	dbt_kwargs = get_dict_kwargs(
		user=snowflake_user,
		password=snowflake_pass,
		role=snowflake_role,
		account=snowflake_accid,
        upstream_tasks=[dbt_config_task]
	)

	dbt_commands = create_commands_ls(dbt_config_task)

	reference_tasks = []

	run_cmd = run_dbt_command.map(dbt_commands, dbt_tag=unmapped(dbt_tag), dbt_kwargs=unmapped(dbt_kwargs))
	run_cmd_out = print_prefect_output(run_cmd)
	reference_tasks.append(run_cmd)

	del_repo_again = delete_folder(DBT_REPO)

flow.set_reference_tasks(reference_tasks)
@Kevin Kho I changed it to dbt.run() but its the same error
k
I can’t tell immediately. Can you give me a longer traceback?
z
Copy code
└── 16:41:26 | INFO    | Task 'run_dbt_command': Starting task run...
└── 16:41:26 | INFO    | Task 'run_dbt_command': Finished task run for task with final state: 'Mapped'
└── 16:41:26 | INFO    | Task 'run_dbt_command[0]': Starting task run...
└── 16:41:26 | ERROR   | Task 'run_dbt_command[0]': Exception encountered during task execution!
Traceback (most recent call last):
  File "/opt/homebrew/lib/python3.9/site-packages/prefect/engine/task_runner.py", line 876, in get_task_run_state
    value = prefect.utilities.executors.run_task_with_timeout(
  File "/opt/homebrew/lib/python3.9/site-packages/prefect/utilities/executors.py", line 467, in run_task_with_timeout
    return task.run(*args, **kwargs)  # type: ignore
  File "flows/dbt_flow.py", line 45, in run_dbt_command
    run_cmd = dbt(
  File "/opt/homebrew/lib/python3.9/site-packages/prefect/core/task.py", line 633, in __call__
    new.bind(
  File "/opt/homebrew/lib/python3.9/site-packages/prefect/core/task.py", line 693, in bind
    raise ValueError(
ValueError: Could not infer an active Flow context while creating edge to <Task: dbt deps and dbt seed>. This often means you called a task outside a `with Flow(...)` block. If you're trying to run this task outside of a Flow context, you need to call `DbtShellTask(...).run(...)`
└── 16:41:26 | INFO    | Task 'run_dbt_command[0]': Finished task run for task with final state: 'Failed'
So for context, im trying to pass in a list of dbt commands and map them to a task so that i can create a "generic" dbt flow that can take in any arbitrary list of dbt commands and run them all sequentially. Is this something that's doable / a common pattern with prefect?
k
I see the issue. doing
Copy code
a = SomeTask() # calls the init

@task
def sometask():
    a()   # calls the run
if you can, you should do:
Copy code
@task
def sometask():
    x = SomeTask() # calls the init
    return x
and then you can call it in the Flow:
Copy code
with Flow() as flow:
    sometask()
and the flow will call the run I think
z
so
sometask()
should take in the array and map...
sometask.map(dbt_commands)
?
k
What is
run_cmd
? Does the dbt task return anything?
z
i think it should return output of the dbt shell task?
it gets passed to the print_prefect_output task and is set as a reference task for the flow
k
I think doing
dbt.run()
should work one second let me test
Isn’t it the same as this?
Copy code
from prefect import Flow, task, Task 
import prefect
from prefect.utilities.tasks import defaults_from_attrs

class MyTask(Task):
    def __init__(self, some_string, **kwargs):
        self.some_string = some_string
        super().__init__(**kwargs)

    @defaults_from_attrs("some_string")
    def run(self, some_string=None):
        <http://prefect.context.logger.info|prefect.context.logger.info>(some_string + " ran")
        return some_string + " ran"

mytask = MyTask("kevin")

@task
def something(some_string):
    test = mytask.run(some_string)
    return test

with Flow("name") as flow:
    something.map(["kevin","zhibin"])

flow.run()
z
yeah i think so
i think calling .run is working, i had in the wrong place initially. Im getting some other errors now, will let you know if the issue persists. Thanks for your help!
k
Sounds good