Just to close the loop, after a bit of trial and error I finally found the workaround. The tricky bit was figuring out how parent/child loggers work... I couldn't figure out why the task logger didn't have any handlers for me to modify, but it's because child loggers are created with no handlers of their own, you must reference the parent's handlers directly.
shell_task = DbtShellTask(
name="DBT",
profile_name=APP,
environment=DBT_ENVIRONMENT,
stream_output=True,
log_stderr=True,
)
# emit dbt json-formatted logs directly for friendly display in datadog.
formatter = logging.Formatter("%(message)s")
for handler in shell_task.logger.parent.handlers:
if not isinstance(handler, CloudHandler):
handler.setFormatter(formatter)
The task is then executed like this (showing only the most relevant code):
@task
def execute_dbt(
command,
schema=None,
select=None,
exclude=None,
full_refresh=False,
**kwargs,
):
dbt = init_dbt(schema)
if select:
command += f" --select {select}"
if exclude:
command += f" --exclude {exclude}"
if full_refresh:
command += " --full-refresh"
dbt.run(command=command, *kwargs)
dbt_build = execute_dbt(
task_args={"name": "dbt Build"},
command=f"dbt --log-format json build",
select=select,
exclude=exclude,
full_refresh=full_refresh,
upstream_tasks=[dbt_dependencies],
)