Nate Joselson
09/08/2020, 7:47 AMDbtShellTask with a dynamic set of dbt_kwargs that are defined at run time. The task works when I define the dbt_kwargs in the task initialization, but not when they are provided at runtime.
from prefect import task, Flow, Parameter
from prefect.tasks.dbt import DbtShellTask
@task
def generate_dbt_kwargs_dict(project_id):
dbt_kwargs_dict = {
'type': 'bigquery',
'method': 'service-account',
'keyfile': 'key.json',
'project': project_id,
'dataset': 'derived_data',
'location': 'EU',
}
return dbt_kwargs_dict
dbt_run_task = DbtShellTask(
name='dbt_run_task',
command='dbt run',
profile_name='default',
environment='test',
overwrite_profiles=True,
profiles_dir=profiles_path,
)
with Flow(name="dbt_flow") as f:
project_id = Parameter("project_id")
dbt_kwargs_dict = generate_dbt_kwargs_dict(project_id)
run_task = dbt_run_task(dbt_kwargs=dbt_kwargs_dict)
out = f.run(project_id='bq-project')
This returns a prefect error:
ERROR - prefect.TaskRunner | Unexpected error: TypeError("'NoneType' object is not a mapping",)
I assume this means from here that the dbt_kwargs dictionary is None but I can’t see why, since I am passing the dictionary from the previous task…
Does anyone have an idea about this?
Thanks!Nate Joselson
09/08/2020, 8:13 AMDbtShellTask with an empty dictionary and then add to it in the run. I misunderstood the documentation https://docs.prefect.io/api/latest/tasks/dbt.html#dbtshelltask
Maybe it should be changed to default to an empty dict or change to not say that it can also be provided at runtime, since something needs to be provided at initialization…Jenny
09/08/2020, 2:10 PMNate Joselson
09/08/2020, 3:17 PMNone so that the dictionary modification I mentioned above works as expected.
Thanks so much!Jenny
09/08/2020, 3:41 PMNate Joselson
09/09/2020, 7:28 AM