https://prefect.io logo
m

Mike B

06/29/2023, 12:09 AM
Wondering if anyone has run into something like this error before, and if so, how you resolved it:
Copy code
File "/usr/lib/python3.10/subprocess.py", line 1845, in _execute_child
    raise child_exception_type(errno_num, err_msg, err_filename)
 FileNotFoundError: [Errno 2] No such file or directory: 'bash'
I'm hitting this error any time that I try to trigger a DbtCoreOperation from a deployment. I'm running the most recent version of the prefect server and task runner, and most recent versions of the prefect_dbt and prefect_airbyte pip packages. This is running on an Ubuntu 22.04 server. It's using the default local file storage for the flow, and a local agent running on the same server as the Prefect server process. I'm hitting the above exception as soon as I try to kick off any DbtCoreOperation, as soon as either "run()" or "trigger()" is called. The workflow runs perfectly when running the script manually, but fails on the dbt stuff as soon as I try running it through a deployment. The Airbyte operation runs fine from both environments, so this is specific to the dbt operations. Any help would be much appreciated! Code below:
Copy code
import sys
from prefect import flow
from prefect_dbt.cli import DbtCoreOperation
from prefect_airbyte.server import AirbyteServer
from prefect_airbyte.connections import AirbyteConnection


DBT_PROJECT_DIR = "<path to dbt project>"
DBT_PROFILE_DIR = "<path to dbt project>"
AIRBYTE_CONNECTION = "<connection id>"

airbyte_server = AirbyteServer(server_host="<host>", server_port=<port>)

@flow(log_prints=True)
def dbt_snapshot():
    with DbtCoreOperation(commands=["dbt snapshot"],
                project_dir = DBT_PROJECT_DIR,
                profiles_dir = DBT_PROFILE_DIR) as dbt_run:
        
        dbt_process = dbt_run.trigger()
        dbt_process.wait_for_completion()
        dbt_output = dbt_process.fetch_result()
        return dbt_output

@flow(log_prints=True)
def dbt_model():
    result = DbtCoreOperation(
        commands = [f"dbt run --select models/*"],
        project_dir = DBT_PROJECT_DIR,
        profiles_dir = DBT_PROFILE_DIR
    ).run() 
    return result

@flow(log_prints=True)
def airbyte_sync():
      connection = AirbyteConnection(
            connection_id = AIRBYTE_CONNECTION,
            airbyte_server = airbyte_server
      )
      job_run = connection.trigger()
      job_run.wait_for_completion()
      res = job_run.fetch_result()
      return res

@flow(log_prints=True)
def dbt_airbyte_sync():
    dbt_snapshot()
    dbt_model()
    airbyte_sync()

if __name__ == "__main__":
    try:
        dbt_airbyte_sync()
    except Exception as err:
        print("Exception while running workflow", err)
        sys.exit(1)
    sys.exit(0)