Mike B06/29/2023, 12:09 AM
I'm hitting this error any time that I try to trigger a DbtCoreOperation from a deployment. I'm running the most recent version of the prefect server and task runner, and most recent versions of the prefect_dbt and prefect_airbyte pip packages. This is running on an Ubuntu 22.04 server. It's using the default local file storage for the flow, and a local agent running on the same server as the Prefect server process. I'm hitting the above exception as soon as I try to kick off any DbtCoreOperation, as soon as either "run()" or "trigger()" is called. The workflow runs perfectly when running the script manually, but fails on the dbt stuff as soon as I try running it through a deployment. The Airbyte operation runs fine from both environments, so this is specific to the dbt operations. Any help would be much appreciated! Code below:
File "/usr/lib/python3.10/subprocess.py", line 1845, in _execute_child raise child_exception_type(errno_num, err_msg, err_filename) FileNotFoundError: [Errno 2] No such file or directory: 'bash'
import sys from prefect import flow from prefect_dbt.cli import DbtCoreOperation from prefect_airbyte.server import AirbyteServer from prefect_airbyte.connections import AirbyteConnection DBT_PROJECT_DIR = "<path to dbt project>" DBT_PROFILE_DIR = "<path to dbt project>" AIRBYTE_CONNECTION = "<connection id>" airbyte_server = AirbyteServer(server_host="<host>", server_port=<port>) @flow(log_prints=True) def dbt_snapshot(): with DbtCoreOperation(commands=["dbt snapshot"], project_dir = DBT_PROJECT_DIR, profiles_dir = DBT_PROFILE_DIR) as dbt_run: dbt_process = dbt_run.trigger() dbt_process.wait_for_completion() dbt_output = dbt_process.fetch_result() return dbt_output @flow(log_prints=True) def dbt_model(): result = DbtCoreOperation( commands = [f"dbt run --select models/*"], project_dir = DBT_PROJECT_DIR, profiles_dir = DBT_PROFILE_DIR ).run() return result @flow(log_prints=True) def airbyte_sync(): connection = AirbyteConnection( connection_id = AIRBYTE_CONNECTION, airbyte_server = airbyte_server ) job_run = connection.trigger() job_run.wait_for_completion() res = job_run.fetch_result() return res @flow(log_prints=True) def dbt_airbyte_sync(): dbt_snapshot() dbt_model() airbyte_sync() if __name__ == "__main__": try: dbt_airbyte_sync() except Exception as err: print("Exception while running workflow", err) sys.exit(1) sys.exit(0)