Nate Joselson
09/08/2020, 7:47 AMDbtShellTask
with a dynamic set of dbt_kwargs
that are defined at run time. The task works when I define the dbt_kwargs
in the task initialization, but not when they are provided at runtime.
from prefect import task, Flow, Parameter
from prefect.tasks.dbt import DbtShellTask
@task
def generate_dbt_kwargs_dict(project_id):
dbt_kwargs_dict = {
'type': 'bigquery',
'method': 'service-account',
'keyfile': 'key.json',
'project': project_id,
'dataset': 'derived_data',
'location': 'EU',
}
return dbt_kwargs_dict
dbt_run_task = DbtShellTask(
name='dbt_run_task',
command='dbt run',
profile_name='default',
environment='test',
overwrite_profiles=True,
profiles_dir=profiles_path,
)
with Flow(name="dbt_flow") as f:
project_id = Parameter("project_id")
dbt_kwargs_dict = generate_dbt_kwargs_dict(project_id)
run_task = dbt_run_task(dbt_kwargs=dbt_kwargs_dict)
out = f.run(project_id='bq-project')
This returns a prefect error:
ERROR - prefect.TaskRunner | Unexpected error: TypeError("'NoneType' object is not a mapping",)
I assume this means from here that the dbt_kwargs
dictionary is None
but I can’t see why, since I am passing the dictionary from the previous task…
Does anyone have an idea about this?
Thanks!DbtShellTask
with an empty dictionary and then add to it in the run. I misunderstood the documentation https://docs.prefect.io/api/latest/tasks/dbt.html#dbtshelltask
Maybe it should be changed to default to an empty dict or change to not say that it can also be provided at runtime, since something needs to be provided at initialization…Jenny
09/08/2020, 2:10 PMNate Joselson
09/08/2020, 3:17 PMNone
so that the dictionary modification I mentioned above works as expected.
Thanks so much!Jenny
09/08/2020, 3:41 PMNate Joselson
09/09/2020, 7:28 AM