Zhibin Dai
02/28/2022, 9:33 PMwith Flow(...)
block. If you're trying to run this task outside of a Flow context, you need to call `DbtShellTask(...).run(...). My code is below. Any ideas what's wrong? Thank you!Kevin Kho
02/28/2022, 9:35 PM@task
def run_dbt_command(command, dbt_tag, dbt_kwargs):
logger = prefect.context.get("logger")
command_to_run = f"{command['command']}{dbt_tag}" if 'tag:' in command['command'] else command['command']
# upstream_tasks = [upstream_task] if len(reference_tasks) == 0 else reference_tasks
run_cmd = dbt(
command=command_to_run,
task_args=command['task_args'],
dbt_kwargs=dbt_kwargs
)
return run_cmd
You can’t use dbt
but you can do dbt.run()
Zhibin Dai
02/28/2022, 9:39 PMdbt = DbtShellTask(
profile_name=DBT_PROJECT,
profiles_dir="./",
environment="dev",
log_stdout=True,
log_stderr=True,
return_all=True,
helper_script=f"cd {DBT_REPO}",
dbt_kwargs={
"type": "snowflake",
"client_session_keep_alive": False
}
)
@task
def run_dbt_command(command, dbt_tag, dbt_kwargs):
logger = prefect.context.get("logger")
command_to_run = f"{command['command']}{dbt_tag}" if 'tag:' in command['command'] else command['command']
# upstream_tasks = [upstream_task] if len(reference_tasks) == 0 else reference_tasks
run_cmd = dbt(
command=command_to_run,
task_args=command['task_args'],
dbt_kwargs=dbt_kwargs
)
return run_cmd
with Flow(FLOW_NAME, storage=STORAGE, executor=LocalExecutor(), run_config=LocalRun(labels=["dev"]),) as flow:
snowflake_user = PrefectSecret("SF_USER")
snowflake_pass = PrefectSecret("SF_PASS")
snowflake_role = PrefectSecret("SF_ROLE")
snowflake_accid = PrefectSecret("SF_ACCTID")
git_token = PrefectSecret(GITHUB_ACCESS_TOKEN)
env = Parameter("env", default="dev")
dbt_repo = Parameter("dbt_repo_url", default=DBT_REPO)
dbt_config_url = Parameter('dbt_config_url')
dbt_tag = Parameter('dbt_tag')
del_repo = delete_folder(DBT_REPO)
dbt_repo_task = pull_git_repo(
git_token=git_token,
repo_name=dbt_repo,
checkout_branch="dbt_run_configs"
)
dbt_config_task = pull_git_json_config_file(
url=dbt_config_url,
git_token=git_token,
task_args={'trigger': all_successful},
upstream_tasks=[dbt_repo_task]
)
dbt_kwargs = get_dict_kwargs(
user=snowflake_user,
password=snowflake_pass,
role=snowflake_role,
account=snowflake_accid,
upstream_tasks=[dbt_config_task]
)
dbt_commands = create_commands_ls(dbt_config_task)
reference_tasks = []
run_cmd = run_dbt_command.map(dbt_commands, dbt_tag=unmapped(dbt_tag), dbt_kwargs=unmapped(dbt_kwargs))
run_cmd_out = print_prefect_output(run_cmd)
reference_tasks.append(run_cmd)
del_repo_again = delete_folder(DBT_REPO)
flow.set_reference_tasks(reference_tasks)
Kevin Kho
02/28/2022, 9:42 PMZhibin Dai
02/28/2022, 9:42 PM└── 16:41:26 | INFO | Task 'run_dbt_command': Starting task run...
└── 16:41:26 | INFO | Task 'run_dbt_command': Finished task run for task with final state: 'Mapped'
└── 16:41:26 | INFO | Task 'run_dbt_command[0]': Starting task run...
└── 16:41:26 | ERROR | Task 'run_dbt_command[0]': Exception encountered during task execution!
Traceback (most recent call last):
File "/opt/homebrew/lib/python3.9/site-packages/prefect/engine/task_runner.py", line 876, in get_task_run_state
value = prefect.utilities.executors.run_task_with_timeout(
File "/opt/homebrew/lib/python3.9/site-packages/prefect/utilities/executors.py", line 467, in run_task_with_timeout
return task.run(*args, **kwargs) # type: ignore
File "flows/dbt_flow.py", line 45, in run_dbt_command
run_cmd = dbt(
File "/opt/homebrew/lib/python3.9/site-packages/prefect/core/task.py", line 633, in __call__
new.bind(
File "/opt/homebrew/lib/python3.9/site-packages/prefect/core/task.py", line 693, in bind
raise ValueError(
ValueError: Could not infer an active Flow context while creating edge to <Task: dbt deps and dbt seed>. This often means you called a task outside a `with Flow(...)` block. If you're trying to run this task outside of a Flow context, you need to call `DbtShellTask(...).run(...)`
└── 16:41:26 | INFO | Task 'run_dbt_command[0]': Finished task run for task with final state: 'Failed'
Kevin Kho
02/28/2022, 9:48 PMa = SomeTask() # calls the init
@task
def sometask():
a() # calls the run
if you can, you should do:
@task
def sometask():
x = SomeTask() # calls the init
return x
and then you can call it in the Flow:
with Flow() as flow:
sometask()
and the flow will call the run I thinkZhibin Dai
02/28/2022, 9:50 PMsometask()
should take in the array and map...sometask.map(dbt_commands)
?Kevin Kho
02/28/2022, 9:51 PMrun_cmd
? Does the dbt task return anything?Zhibin Dai
02/28/2022, 9:51 PMKevin Kho
02/28/2022, 9:52 PMdbt.run()
should work one second let me testfrom prefect import Flow, task, Task
import prefect
from prefect.utilities.tasks import defaults_from_attrs
class MyTask(Task):
def __init__(self, some_string, **kwargs):
self.some_string = some_string
super().__init__(**kwargs)
@defaults_from_attrs("some_string")
def run(self, some_string=None):
<http://prefect.context.logger.info|prefect.context.logger.info>(some_string + " ran")
return some_string + " ran"
mytask = MyTask("kevin")
@task
def something(some_string):
test = mytask.run(some_string)
return test
with Flow("name") as flow:
something.map(["kevin","zhibin"])
flow.run()
Zhibin Dai
02/28/2022, 10:00 PMKevin Kho
02/28/2022, 10:02 PM