Thread
#prefect-community
    Mickael ANDRIEU

    Mickael ANDRIEU

    2 months ago
    Hi everybody, to we have exemples of Prefect Orion used to manage Airbyte and dbt tasks ? I have this simple need and I'm wondering if I should use Prefect 1 or 2.
    Anna Geller

    Anna Geller

    2 months ago
    I'd say you can start directly with Prefect 2.0 - Airbyte task already exists, and DBT task for the OSS will be merged and released soon as part of this PR (dbt Cloud has been around for quite a bit already)
    from prefect import flow
    from prefect_airbyte.connections import trigger_sync
    from prefect_dbt.cli.commands import trigger_dbt_cli_command
    
    
    @flow
    def airbyte_dbt_flow():
        airbyte_sync = trigger_sync(
            connection_id="your-connection-id-to-sync",
            poll_interval_s=3,
            status_updates=True,
        )
        dbt_result = trigger_dbt_cli_command("dbt debug", wait_for=[airbyte_sync])
        return dbt_result
    
    
    if __name__ == "__main__":
        airbyte_dbt_flow()
    Mickael ANDRIEU

    Mickael ANDRIEU

    2 months ago
    Thanks @Anna Geller! It's good enough to me as I only have to manage Airbyte and dbt tasks (at least for this year !)
    Hi Anna, I have met issues I'm unable to fix :
    ➜  test git:(main) βœ— python -m pip install prefect-airbyte
    Defaulting to user installation because normal site-packages is not writeable
    ERROR: Could not find a version that satisfies the requirement prefect-airbyte (from versions: 0a2, 0a3, 0b1, 0b2, 0b3, 0.0)
    ERROR: No matching distribution found for prefect-airbyte
    β•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œ
    ➜  test git:(main) βœ— python -m pip install prefect-dbt    
    Defaulting to user installation because normal site-packages is not writeable
    ERROR: Could not find a version that satisfies the requirement prefect-dbt (from versions: none)
    ERROR: No matching distribution found for prefect-dbt
    I've checked my python version (3.9), prefect is working, so does airbyte and dbt
    On Windows, this doesnt works neither : `` ~ via 🐘 v8.1.7 on ☁️ (eu-west-3) ❯ python -m pip install prefect-dbt ERROR: Could not find a version that satisfies the requirement prefect-dbt (from versions: none) ERROR: No matching distribution found for prefect-dbt ``
    Anna Geller

    Anna Geller

    2 months ago
    I can replicate the same with dbt but Airbyte should work - can you try in a new (preferably Conda) virtual env? if installing from pip doesn't work, can you install from source?
    pip install git+<https://github.com/PrefectHQ/prefect-dbt>
    same with
    pip install git+<https://github.com/PrefectHQ/prefect-airbyte>
    Mickael ANDRIEU

    Mickael ANDRIEU

    2 months ago
    Hi Anna, thanks for help ! From source, the first one works perfectly but I encounter a weird error message from the second one :
    ❯ pip install git+<https://github.com/PrefectHQ/prefect-airbyte>
    Collecting git+<https://github.com/PrefectHQ/prefect-airbyte>
      Cloning <https://github.com/PrefectHQ/prefect-airbyte> to c:\users\micka\appdata\local\temp\pip-req-build-xrdmps4h
      Running command git clone --filter=blob:none --quiet <https://github.com/PrefectHQ/prefect-airbyte> 'C:\Users\micka\AppData\Local\Temp\pip-req-build-xrdmps4h'
      Resolved <https://github.com/PrefectHQ/prefect-airbyte> to commit 2889713b13c2d3516fccfb7039eabe0534b3ed98
      Preparing metadata (setup.py) ... error
      error: subprocess-exited-with-error
    
      Γ— python setup.py egg_info did not run successfully.
      β”‚ exit code: 1
      ╰─> [8 lines of output]
          Traceback (most recent call last):
            File "<string>", line 2, in <module>
            File "<pip-setuptools-caller>", line 34, in <module>
            File "C:\Users\micka\AppData\Local\Temp\pip-req-build-xrdmps4h\setup.py", line 12, in <module>
              readme = readme_file.read()
            File "C:\tools\python3\lib\encodings\cp1252.py", line 23, in decode
              return codecs.charmap_decode(input,self.errors,decoding_table)[0]
          UnicodeDecodeError: 'charmap' codec can't decode byte 0x9d in position 1253: character maps to <undefined>
          [end of output]
    
      note: This error originates from a subprocess, and is likely not a problem with pip.
    error: metadata-generation-failed
    
    Γ— Encountered error while generating package metadata.
    ╰─> See above for output.
    
    note: This is an issue with the package mentioned above, not pip.
    But this is probably something I can solve by myself, I will take a look by my own before waste your time ! Have a nice day
    Anna Geller

    Anna Geller

    2 months ago
    awesome, have a great day too!
    Mickael ANDRIEU

    Mickael ANDRIEU

    2 months ago
    Hi Anna ! I've found an error I don't know how to fix :
    from prefect import flow
    from prefect_airbyte.connections import trigger_sync
    from prefect_dbt.cli.commands import trigger_dbt_cli_command
    
    
    @flow
    def airbyte_dbt_flow():
        airbyte_sync = trigger_sync(
            airbyte_server_host="localhost",
            airbyte_server_port=8000,
            connection_id="828224ea-4052-41a5-85ba-7b5708214088",
            poll_interval_s=3,
            status_updates=True,
        )
    
        dbt_result = trigger_dbt_cli_command(
            command="dbt run",
            project_dir="C:\\Users\\micka\\Projects\\article_1\\",
            wait_for=[airbyte_sync]
        )
    
        return dbt_result
    
    
    if __name__ == "__main__":
        airbyte_dbt_flow()
    The Airbyte sync operation works as expected, but Prefect encounter an error while it starts the dbt operations. In the Prefect Queue stack trace I have the following : do you have any idea why it doesnt works as expected ? Thanks for your help πŸ™
    Anna Geller

    Anna Geller

    2 months ago
    interesting, so the Airbyte sync finishes just fine, only dbt can't finish? I'd love to reproduce - can you confirm you are using the latest version from this main branch? https://github.com/PrefectHQ/prefect-dbt
    Mickael ANDRIEU

    Mickael ANDRIEU

    2 months ago
    yes, and yes πŸ™‚
    ofc the airbyte task is working fine (i've checked through airbyte UI)
    dbt and airbyte share the same destination for the operations (BigQuery) with the same credentials
    Anna Geller

    Anna Geller

    2 months ago
    gotcha, will try to reproduce
    I couldn't reproduce but I didn't explicitly set path to the profile though
    from prefect import flow, task, get_run_logger
    from prefect_dbt.cli.commands import trigger_dbt_cli_command
    
    
    @task
    def airbyte_sync_fake():
        logger = get_run_logger()
        <http://logger.info|logger.info>("Running Airbyte sync...")
    
    
    @flow
    def trigger_dbt_cli_command_flow():
        airb_task = airbyte_sync_fake()
        result = trigger_dbt_cli_command(
            "dbt debug", wait_for=[airb_task]
        )
        return result
    
    
    if __name__ == "__main__":
        trigger_dbt_cli_command_flow()
    can you try I believe you may try to upgrade to the latest Prefect version? it's currently 2.0b13 as of now if you have your flow in the same directory or a parent directory of the one with your dbt_project.yml, then you can skip the project_dir and it should work
    project_dir: The directory to search for the dbt_project.yml file.
        Default is the current working directory and its parents.
    I suspect upgrading to the latest version of both prefect and prefect-dbt will already solve it
    Mickael ANDRIEU

    Mickael ANDRIEU

    2 months ago
    Hi Anna, I've upgraded prefect to 2.0b13 : ❯ prefect --version 2.0b13 Still broken, same stack trace ("Event loop is closed")
    As my dbt command "trigger" a warning from dbt, I'm wondering it's not converted to an exception ?
    ❯ dbt run --profiles-dir C:\Users\micka\.dbt --project-dir C:\Users\micka\Projects\article_1
    13:06:19  Running with dbt=1.0.4
    13:06:19  [WARNING]: Configuration paths exist in your dbt_project.yml file which do not apply to any resources.
    There are 1 unused configuration paths:
    - models.marts.core
    
    13:06:19  Found 3 models, 0 tests, 0 snapshots, 0 analyses, 433 macros, 0 operations, 0 seed files, 228 sources, 0 exposures, 0 metrics
    13:06:19
    13:06:20  Concurrency: 4 threads (target='dev')
    13:06:20
    13:06:20  1 of 3 START view model data_warehouse.stg_dema1n__users........................ [RUN]
    13:06:20  2 of 3 START view model data_warehouse.stg_dema1n__volunteers................... [RUN]
    13:06:20  3 of 3 START view model data_warehouse.stg_dema1n__youngs....................... [RUN]
    13:06:21  2 of 3 OK created view model data_warehouse.stg_dema1n__volunteers.............. [OK in 0.97s]
    13:06:21  3 of 3 OK created view model data_warehouse.stg_dema1n__youngs.................. [OK in 0.98s]
    13:06:21  1 of 3 OK created view model data_warehouse.stg_dema1n__users................... [OK in 0.99s]
    13:06:21
    13:06:21  Finished running 3 view models in 1.65s.
    13:06:21
    13:06:21  Completed successfully
    13:06:21
    13:06:21  Done. PASS=3 WARN=0 ERROR=0 SKIP=0 TOTAL=3
    it could be nice - if it's already done somewhere - to provide a minimal sample with docker images for - lets say - [My|Postgre]SQL database, airbyte and prefect - to illustrate how all these tools can work together ❀️
    Something like MySQL container <-> Airbyte Sync/Import <-> dbt minimal transformation (like renaming/casting a column) <-> Postgres Container, all scheduled and managed by Prefect
    Anna Geller

    Anna Geller

    2 months ago
    I totally understand what you mean with some Docker compose, I suspect that you're right as it seems to be a Windows related issue - I ran it on Mac and likely the reason I couldn't reproduce
    any chance you could run it on WSL to check if the issue is only on Windows?
    Mickael ANDRIEU

    Mickael ANDRIEU

    2 months ago
    Hmm, i will try to dockerize everything and if it's working I may contribute a template : is it something Prefect could accept ? I used to be an open source contributor when I was a developer (PrestaShop, Akeneo, Symfony) but since I've moved to data subjects I'm not "that" confident πŸ˜„
    We really need some templates that "just" work as a starter πŸ™‚
    Anna Geller

    Anna Geller

    2 months ago
    we totally accept a contribution here, I fully endorse a docker-compose file that just works! you can use this as a starting point and once you contribute, I'd be happy to add it to this Discourse section https://discourse.prefect.io/t/how-to-self-host-prefect-2-0-orchestration-layer-list-of-resources-to-get-started/952#docker-compose-4 thanks so much!
    Andrew Huang

    Andrew Huang

    2 months ago
    Hi @Mickael ANDRIEU can you help me try debugging with this code; rewrote it in async to see if the event loop still closes
    import asyncio
    from prefect import flow
    from prefect_airbyte.connections import trigger_sync
    from prefect_dbt.cli.commands import trigger_dbt_cli_command
    
    
    @flow
    async def airbyte_dbt_flow():
        airbyte_sync = await trigger_sync(
            airbyte_server_host="localhost",
            airbyte_server_port=8000,
            connection_id="828224ea-4052-41a5-85ba-7b5708214088",
            poll_interval_s=3,
            status_updates=True,
        )
    
        dbt_result = await trigger_dbt_cli_command(
            command="dbt run",
            project_dir="C:\\Users\\micka\\Projects\\article_1\\",
            wait_for=[airbyte_sync]
        )
    
        return dbt_result
    
    
    if __name__ == "__main__":
        asyncio.run(airbyte_dbt_flow())
    Mickael ANDRIEU

    Mickael ANDRIEU

    2 months ago
    Sure Andrew, here is the new stack trace output (failure)
    Andrew Huang

    Andrew Huang

    2 months ago
    Can you run
    dbt run --profiles-dir C:\Users\micka\.dbt --project-dir C:\Users\micka\Projects\article_1
    manually in your command line, like
    import subprocess
    
    subprocess.run("dbt run --profiles-dir C:\Users\micka\.dbt --project-dir C:\Users\micka\Projects\article_1")
    Also thanks for debugging with me!
    Mickael ANDRIEU

    Mickael ANDRIEU

    2 months ago
    Andrew Huang

    Andrew Huang

    2 months ago
    Thanks! can you put it in a flow now
    from prefect import flow, task
    
    @task
    def test():
        subprocess.run("dbt run --profiles-dir C:\Users\micka\.dbt --project-dir C:\Users\micka\Projects\article_1")
    
    @flow
    def test_flow():
        test()
    
    test_flow()
    If that works, let’s wrap prefect_shell too
    from prefect import flow, task
    from prefect_shell import shell_run_command
    
    
    @flow
    def test_flow():
        shell_run_command("dbt run --profiles-dir C:\Users\micka\.dbt --project-dir C:\Users\micka\Projects\article_1")
    Finally, if that works too let’s go back to using prefect_dbt, alone
    import asyncio
    from prefect import flow
    from prefect_dbt.cli.commands import trigger_dbt_cli_command
    
    
    @flow
    async def airbyte_dbt_flow():
        dbt_result = await trigger_dbt_cli_command(
            command="dbt run",
            project_dir="C:\\Users\\micka\\Projects\\article_1\\"
        )
    
        return dbt_result
    
    
    if __name__ == "__main__":
        asyncio.run(airbyte_dbt_flow())
    If all these work, I suspect
    wait_for
    has an issue
    Mickael ANDRIEU

    Mickael ANDRIEU

    2 months ago
    I'm sorry I have to leave by now, i will come back tomorrow with all the tests results πŸ˜•
    Andrew Huang

    Andrew Huang

    2 months ago
    sure~ no rush. thank you so much for your help
    Mickael ANDRIEU

    Mickael ANDRIEU

    2 months ago
    thank you ^^
    With subprocess (I had to fix \ to \)
    And for the last test (no wait for) :
    Andrew Huang

    Andrew Huang

    2 months ago
    wow thanks!
    Does this work for you? (no rush at all, take your time)
    from prefect import flow, task
    from prefect_shell import shell_run_command
    
    
    @flow
    def test_flow():
        dbt_result = await trigger_dbt_cli_command(
            command="dbt run",
            project_dir="C:\\Users\\micka\\Projects\\article_1\\"
        )
        shell_run_command("pip list")  # or any other valid Windows shell commands
    My guess is that, when you have multiple tasks, if one of the tasks fails for any reason, the event loop closes, the original traceback gets lost, but the flow still continues. however, because the event loop is closed, it cannot continue so it raises the event loop error instead
    However, I think this only applies for Windows. To confirm that theory, would you be willing to run the following? test_fail.py:
    import time
    
    time.sleep(3)
    
    import test_fail
    raise ImportError
    from prefect import flow, task
    from prefect_shell.commands import shell_run_command
    
    
    @task
    async def mimic_dbt_fail():
        await shell_run_command.fn("python test_fail.py")
    
    
    @flow
    def test_flow():
        shell_run_command("sleep 3 && echo mimics_airbyte")
        mimic_dbt_fail()
    
    
    test_flow()
    Mickael ANDRIEU

    Mickael ANDRIEU

    2 months ago
    Hi Andrew ! I'll test it tomorrow and I'll come back here to provide a feedback : thanks a lot for your time and help, really appreciated πŸ™ (even if my english is terrible πŸ‡¨πŸ‡΅) Have a nice day/night !
    Hi, I've tested on linux mint / Python 3.9, all packages upgraded and I still have an issue :
    from prefect import flow
    from prefect_airbyte.connections import trigger_sync
    from prefect_dbt.cli.commands import trigger_dbt_cli_command
    
    
    @flow
    def airbyte_dbt_flow():
        airbyte_sync = trigger_sync(
            airbyte_server_host="localhost",
            airbyte_server_port=8000,
            connection_id="56ccaf30-5f07-49cb-8013-cebe91434137"
        )
         
        dbt_result = trigger_dbt_cli_command("dbt debug", wait_for=[airbyte_sync])
        return dbt_result
    
    
    if __name__ == "__main__":
        airbyte_dbt_flow()
    I don't think dbt is the issue, because if I execute only the dbt command without any previous operation (and
    wait_for
    ) it works as expected
    The issue is in the flow system : my sync tasks are quite heavy (~20 min) : do I have to configure the trigger_sync in order to force him to wait ?
    Andrew Huang

    Andrew Huang

    1 month ago
    Okay, that’s really helpful to know it doesn’t work on Linux too. I”ll try to reproduce soon!
    Hi we’ve increased the timeout seconds. Would you be willing to help us test using the latest prefect on Github main?
    pip install git+<https://github.com/PrefectHQ/prefect@main>