https://prefect.io logo
m

Mickael ANDRIEU

07/15/2022, 8:11 AM
Hi everybody, to we have exemples of Prefect Orion used to manage Airbyte and dbt tasks ? I have this simple need and I'm wondering if I should use Prefect 1 or 2.
βœ… 1
a

Anna Geller

07/15/2022, 10:12 AM
I'd say you can start directly with Prefect 2.0 - Airbyte task already exists, and DBT task for the OSS will be merged and released soon as part of this PR (dbt Cloud has been around for quite a bit already)
Copy code
from prefect import flow
from prefect_airbyte.connections import trigger_sync
from prefect_dbt.cli.commands import trigger_dbt_cli_command


@flow
def airbyte_dbt_flow():
    airbyte_sync = trigger_sync(
        connection_id="your-connection-id-to-sync",
        poll_interval_s=3,
        status_updates=True,
    )
    dbt_result = trigger_dbt_cli_command("dbt debug", wait_for=[airbyte_sync])
    return dbt_result


if __name__ == "__main__":
    airbyte_dbt_flow()
πŸ‘ 1
all of those are packaged into collections: β€’ https://github.com/PrefectHQ/prefect-dbt β€’ https://github.com/PrefectHQ/prefect-airbyte
πŸ‘ 1
m

Mickael ANDRIEU

07/15/2022, 1:44 PM
Thanks @Anna Geller! It's good enough to me as I only have to manage Airbyte and dbt tasks (at least for this year !)
πŸ™Œ 1
Hi Anna, I have met issues I'm unable to fix :
Copy code
➜  test git:(main) βœ— python -m pip install prefect-airbyte
Defaulting to user installation because normal site-packages is not writeable
ERROR: Could not find a version that satisfies the requirement prefect-airbyte (from versions: 0a2, 0a3, 0b1, 0b2, 0b3, 0.0)
ERROR: No matching distribution found for prefect-airbyte
β•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œβ•Œ
➜  test git:(main) βœ— python -m pip install prefect-dbt    
Defaulting to user installation because normal site-packages is not writeable
ERROR: Could not find a version that satisfies the requirement prefect-dbt (from versions: none)
ERROR: No matching distribution found for prefect-dbt
I've checked my python version (3.9), prefect is working, so does airbyte and dbt
On Windows, this doesnt works neither : `` ~ via 🐘 v8.1.7 on ☁️ (eu-west-3) ❯ python -m pip install prefect-dbt ERROR: Could not find a version that satisfies the requirement prefect-dbt (from versions: none) ERROR: No matching distribution found for prefect-dbt ``
a

Anna Geller

07/18/2022, 11:08 AM
I can replicate the same with dbt but Airbyte should work - can you try in a new (preferably Conda) virtual env? if installing from pip doesn't work, can you install from source?
Copy code
pip install git+<https://github.com/PrefectHQ/prefect-dbt>
same with
Copy code
pip install git+<https://github.com/PrefectHQ/prefect-airbyte>
m

Mickael ANDRIEU

07/18/2022, 11:55 AM
Hi Anna, thanks for help ! From source, the first one works perfectly but I encounter a weird error message from the second one :
Copy code
❯ pip install git+<https://github.com/PrefectHQ/prefect-airbyte>
Collecting git+<https://github.com/PrefectHQ/prefect-airbyte>
  Cloning <https://github.com/PrefectHQ/prefect-airbyte> to c:\users\micka\appdata\local\temp\pip-req-build-xrdmps4h
  Running command git clone --filter=blob:none --quiet <https://github.com/PrefectHQ/prefect-airbyte> 'C:\Users\micka\AppData\Local\Temp\pip-req-build-xrdmps4h'
  Resolved <https://github.com/PrefectHQ/prefect-airbyte> to commit 2889713b13c2d3516fccfb7039eabe0534b3ed98
  Preparing metadata (setup.py) ... error
  error: subprocess-exited-with-error

  Γ— python setup.py egg_info did not run successfully.
  β”‚ exit code: 1
  ╰─> [8 lines of output]
      Traceback (most recent call last):
        File "<string>", line 2, in <module>
        File "<pip-setuptools-caller>", line 34, in <module>
        File "C:\Users\micka\AppData\Local\Temp\pip-req-build-xrdmps4h\setup.py", line 12, in <module>
          readme = readme_file.read()
        File "C:\tools\python3\lib\encodings\cp1252.py", line 23, in decode
          return codecs.charmap_decode(input,self.errors,decoding_table)[0]
      UnicodeDecodeError: 'charmap' codec can't decode byte 0x9d in position 1253: character maps to <undefined>
      [end of output]

  note: This error originates from a subprocess, and is likely not a problem with pip.
error: metadata-generation-failed

Γ— Encountered error while generating package metadata.
╰─> See above for output.

note: This is an issue with the package mentioned above, not pip.
But this is probably something I can solve by myself, I will take a look by my own before waste your time ! Have a nice day
πŸ™Œ 1
a

Anna Geller

07/18/2022, 11:58 AM
awesome, have a great day too!
m

Mickael ANDRIEU

07/26/2022, 9:47 AM
Hi Anna ! I've found an error I don't know how to fix :
Copy code
from prefect import flow
from prefect_airbyte.connections import trigger_sync
from prefect_dbt.cli.commands import trigger_dbt_cli_command


@flow
def airbyte_dbt_flow():
    airbyte_sync = trigger_sync(
        airbyte_server_host="localhost",
        airbyte_server_port=8000,
        connection_id="828224ea-4052-41a5-85ba-7b5708214088",
        poll_interval_s=3,
        status_updates=True,
    )

    dbt_result = trigger_dbt_cli_command(
        command="dbt run",
        project_dir="C:\\Users\\micka\\Projects\\article_1\\",
        wait_for=[airbyte_sync]
    )

    return dbt_result


if __name__ == "__main__":
    airbyte_dbt_flow()
The Airbyte sync operation works as expected, but Prefect encounter an error while it starts the dbt operations. In the Prefect Queue stack trace I have the following : do you have any idea why it doesnt works as expected ? Thanks for your help πŸ™
a

Anna Geller

07/26/2022, 12:14 PM
interesting, so the Airbyte sync finishes just fine, only dbt can't finish? I'd love to reproduce - can you confirm you are using the latest version from this main branch? https://github.com/PrefectHQ/prefect-dbt
m

Mickael ANDRIEU

07/26/2022, 12:15 PM
yes, and yes πŸ™‚
ofc the airbyte task is working fine (i've checked through airbyte UI)
dbt and airbyte share the same destination for the operations (BigQuery) with the same credentials
a

Anna Geller

07/26/2022, 12:22 PM
gotcha, will try to reproduce
I couldn't reproduce but I didn't explicitly set path to the profile though
Copy code
from prefect import flow, task, get_run_logger
from prefect_dbt.cli.commands import trigger_dbt_cli_command


@task
def airbyte_sync_fake():
    logger = get_run_logger()
    <http://logger.info|logger.info>("Running Airbyte sync...")


@flow
def trigger_dbt_cli_command_flow():
    airb_task = airbyte_sync_fake()
    result = trigger_dbt_cli_command(
        "dbt debug", wait_for=[airb_task]
    )
    return result


if __name__ == "__main__":
    trigger_dbt_cli_command_flow()
can you try I believe you may try to upgrade to the latest Prefect version? it's currently 2.0b13 as of now if you have your flow in the same directory or a parent directory of the one with your dbt_project.yml, then you can skip the project_dir and it should work
Copy code
project_dir: The directory to search for the dbt_project.yml file.
    Default is the current working directory and its parents.
I suspect upgrading to the latest version of both prefect and prefect-dbt will already solve it
m

Mickael ANDRIEU

07/26/2022, 1:05 PM
Hi Anna, I've upgraded prefect to 2.0b13 : ❯ prefect --version 2.0b13 Still broken, same stack trace ("Event loop is closed")
As my dbt command "trigger" a warning from dbt, I'm wondering it's not converted to an exception ?
Copy code
❯ dbt run --profiles-dir C:\Users\micka\.dbt --project-dir C:\Users\micka\Projects\article_1
13:06:19  Running with dbt=1.0.4
13:06:19  [WARNING]: Configuration paths exist in your dbt_project.yml file which do not apply to any resources.
There are 1 unused configuration paths:
- models.marts.core

13:06:19  Found 3 models, 0 tests, 0 snapshots, 0 analyses, 433 macros, 0 operations, 0 seed files, 228 sources, 0 exposures, 0 metrics
13:06:19
13:06:20  Concurrency: 4 threads (target='dev')
13:06:20
13:06:20  1 of 3 START view model data_warehouse.stg_dema1n__users........................ [RUN]
13:06:20  2 of 3 START view model data_warehouse.stg_dema1n__volunteers................... [RUN]
13:06:20  3 of 3 START view model data_warehouse.stg_dema1n__youngs....................... [RUN]
13:06:21  2 of 3 OK created view model data_warehouse.stg_dema1n__volunteers.............. [OK in 0.97s]
13:06:21  3 of 3 OK created view model data_warehouse.stg_dema1n__youngs.................. [OK in 0.98s]
13:06:21  1 of 3 OK created view model data_warehouse.stg_dema1n__users................... [OK in 0.99s]
13:06:21
13:06:21  Finished running 3 view models in 1.65s.
13:06:21
13:06:21  Completed successfully
13:06:21
13:06:21  Done. PASS=3 WARN=0 ERROR=0 SKIP=0 TOTAL=3
it could be nice - if it's already done somewhere - to provide a minimal sample with docker images for - lets say - [My|Postgre]SQL database, airbyte and prefect - to illustrate how all these tools can work together ❀️
Something like MySQL container <-> Airbyte Sync/Import <-> dbt minimal transformation (like renaming/casting a column) <-> Postgres Container, all scheduled and managed by Prefect
a

Anna Geller

07/26/2022, 1:18 PM
I totally understand what you mean with some Docker compose, I suspect that you're right as it seems to be a Windows related issue - I ran it on Mac and likely the reason I couldn't reproduce
πŸ’‘ 1
any chance you could run it on WSL to check if the issue is only on Windows?
m

Mickael ANDRIEU

07/26/2022, 1:24 PM
Hmm, i will try to dockerize everything and if it's working I may contribute a template : is it something Prefect could accept ? I used to be an open source contributor when I was a developer (PrestaShop, Akeneo, Symfony) but since I've moved to data subjects I'm not "that" confident πŸ˜„
We really need some templates that "just" work as a starter πŸ™‚
a

Anna Geller

07/26/2022, 1:26 PM
we totally accept a contribution here, I fully endorse a docker-compose file that just works! you can use this as a starting point and once you contribute, I'd be happy to add it to this Discourse section https://discourse.prefect.io/t/how-to-self-host-prefect-2-0-orchestration-layer-list-of-resources-to-get-started/952#docker-compose-4 thanks so much!
a

Andrew Huang

07/26/2022, 4:08 PM
Hi @Mickael ANDRIEU can you help me try debugging with this code; rewrote it in async to see if the event loop still closes
Copy code
import asyncio
from prefect import flow
from prefect_airbyte.connections import trigger_sync
from prefect_dbt.cli.commands import trigger_dbt_cli_command


@flow
async def airbyte_dbt_flow():
    airbyte_sync = await trigger_sync(
        airbyte_server_host="localhost",
        airbyte_server_port=8000,
        connection_id="828224ea-4052-41a5-85ba-7b5708214088",
        poll_interval_s=3,
        status_updates=True,
    )

    dbt_result = await trigger_dbt_cli_command(
        command="dbt run",
        project_dir="C:\\Users\\micka\\Projects\\article_1\\",
        wait_for=[airbyte_sync]
    )

    return dbt_result


if __name__ == "__main__":
    asyncio.run(airbyte_dbt_flow())
πŸ™Œ 1
πŸ™ 1
m

Mickael ANDRIEU

07/26/2022, 4:30 PM
Sure Andrew, here is the new stack trace output (failure)
a

Andrew Huang

07/26/2022, 4:46 PM
Can you run
dbt run --profiles-dir C:\Users\micka\.dbt --project-dir C:\Users\micka\Projects\article_1
manually in your command line, like
Copy code
import subprocess

subprocess.run("dbt run --profiles-dir C:\Users\micka\.dbt --project-dir C:\Users\micka\Projects\article_1")
Also thanks for debugging with me!
m

Mickael ANDRIEU

07/26/2022, 4:50 PM
stacktrace.txt
a

Andrew Huang

07/26/2022, 4:53 PM
Thanks! can you put it in a flow now
Copy code
from prefect import flow, task

@task
def test():
    subprocess.run("dbt run --profiles-dir C:\Users\micka\.dbt --project-dir C:\Users\micka\Projects\article_1")

@flow
def test_flow():
    test()

test_flow()
If that works, let’s wrap prefect_shell too
Copy code
from prefect import flow, task
from prefect_shell import shell_run_command


@flow
def test_flow():
    shell_run_command("dbt run --profiles-dir C:\Users\micka\.dbt --project-dir C:\Users\micka\Projects\article_1")
Finally, if that works too let’s go back to using prefect_dbt, alone
Copy code
import asyncio
from prefect import flow
from prefect_dbt.cli.commands import trigger_dbt_cli_command


@flow
async def airbyte_dbt_flow():
    dbt_result = await trigger_dbt_cli_command(
        command="dbt run",
        project_dir="C:\\Users\\micka\\Projects\\article_1\\"
    )

    return dbt_result


if __name__ == "__main__":
    asyncio.run(airbyte_dbt_flow())
If all these work, I suspect
wait_for
has an issue
m

Mickael ANDRIEU

07/26/2022, 4:57 PM
I'm sorry I have to leave by now, i will come back tomorrow with all the tests results πŸ˜•
a

Andrew Huang

07/26/2022, 4:58 PM
sure~ no rush. thank you so much for your help
m

Mickael ANDRIEU

07/26/2022, 4:58 PM
thank you ^^
With subprocess (I had to fix \ to \\)
And for the last test (no wait for) :
a

Andrew Huang

07/26/2022, 6:07 PM
wow thanks!
Does this work for you? (no rush at all, take your time)
Copy code
from prefect import flow, task
from prefect_shell import shell_run_command


@flow
def test_flow():
    dbt_result = await trigger_dbt_cli_command(
        command="dbt run",
        project_dir="C:\\Users\\micka\\Projects\\article_1\\"
    )
    shell_run_command("pip list")  # or any other valid Windows shell commands
My guess is that, when you have multiple tasks, if one of the tasks fails for any reason, the event loop closes, the original traceback gets lost, but the flow still continues. however, because the event loop is closed, it cannot continue so it raises the event loop error instead
However, I think this only applies for Windows. To confirm that theory, would you be willing to run the following? test_fail.py:
Copy code
import time

time.sleep(3)

import test_fail
raise ImportError
Copy code
from prefect import flow, task
from prefect_shell.commands import shell_run_command


@task
async def mimic_dbt_fail():
    await shell_run_command.fn("python test_fail.py")


@flow
def test_flow():
    shell_run_command("sleep 3 && echo mimics_airbyte")
    mimic_dbt_fail()


test_flow()
πŸ™ 1
m

Mickael ANDRIEU

07/28/2022, 6:21 AM
Hi Andrew ! I'll test it tomorrow and I'll come back here to provide a feedback : thanks a lot for your time and help, really appreciated πŸ™ (even if my english is terrible πŸ‡¨πŸ‡΅) Have a nice day/night !
πŸ™Œ 1
Hi, I've tested on linux mint / Python 3.9, all packages upgraded and I still have an issue :
Copy code
from prefect import flow
from prefect_airbyte.connections import trigger_sync
from prefect_dbt.cli.commands import trigger_dbt_cli_command


@flow
def airbyte_dbt_flow():
    airbyte_sync = trigger_sync(
        airbyte_server_host="localhost",
        airbyte_server_port=8000,
        connection_id="56ccaf30-5f07-49cb-8013-cebe91434137"
    )
     
    dbt_result = trigger_dbt_cli_command("dbt debug", wait_for=[airbyte_sync])
    return dbt_result


if __name__ == "__main__":
    airbyte_dbt_flow()
I don't think dbt is the issue, because if I execute only the dbt command without any previous operation (and
wait_for
) it works as expected
The issue is in the flow system : my sync tasks are quite heavy (~20 min) : do I have to configure the trigger_sync in order to force him to wait ?
a

Andrew Huang

07/29/2022, 4:49 PM
Okay, that’s really helpful to know it doesn’t work on Linux too. I”ll try to reproduce soon!
Hi we’ve increased the timeout seconds. Would you be willing to help us test using the latest prefect on Github main?
pip install git+<https://github.com/PrefectHQ/prefect@main>
πŸ™ 1
4 Views