Mickael ANDRIEU
07/15/2022, 8:11 AMAnna Geller
from prefect import flow
from prefect_airbyte.connections import trigger_sync
from prefect_dbt.cli.commands import trigger_dbt_cli_command
@flow
def airbyte_dbt_flow():
airbyte_sync = trigger_sync(
connection_id="your-connection-id-to-sync",
poll_interval_s=3,
status_updates=True,
)
dbt_result = trigger_dbt_cli_command("dbt debug", wait_for=[airbyte_sync])
return dbt_result
if __name__ == "__main__":
airbyte_dbt_flow()
Mickael ANDRIEU
07/15/2022, 1:44 PMβ test git:(main) β python -m pip install prefect-airbyte
Defaulting to user installation because normal site-packages is not writeable
ERROR: Could not find a version that satisfies the requirement prefect-airbyte (from versions: 0a2, 0a3, 0b1, 0b2, 0b3, 0.0)
ERROR: No matching distribution found for prefect-airbyte
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β test git:(main) β python -m pip install prefect-dbt
Defaulting to user installation because normal site-packages is not writeable
ERROR: Could not find a version that satisfies the requirement prefect-dbt (from versions: none)
ERROR: No matching distribution found for prefect-dbt
Anna Geller
pip install git+<https://github.com/PrefectHQ/prefect-dbt>
pip install git+<https://github.com/PrefectHQ/prefect-airbyte>
Mickael ANDRIEU
07/18/2022, 11:55 AMβ― pip install git+<https://github.com/PrefectHQ/prefect-airbyte>
Collecting git+<https://github.com/PrefectHQ/prefect-airbyte>
Cloning <https://github.com/PrefectHQ/prefect-airbyte> to c:\users\micka\appdata\local\temp\pip-req-build-xrdmps4h
Running command git clone --filter=blob:none --quiet <https://github.com/PrefectHQ/prefect-airbyte> 'C:\Users\micka\AppData\Local\Temp\pip-req-build-xrdmps4h'
Resolved <https://github.com/PrefectHQ/prefect-airbyte> to commit 2889713b13c2d3516fccfb7039eabe0534b3ed98
Preparing metadata (setup.py) ... error
error: subprocess-exited-with-error
Γ python setup.py egg_info did not run successfully.
β exit code: 1
β°β> [8 lines of output]
Traceback (most recent call last):
File "<string>", line 2, in <module>
File "<pip-setuptools-caller>", line 34, in <module>
File "C:\Users\micka\AppData\Local\Temp\pip-req-build-xrdmps4h\setup.py", line 12, in <module>
readme = readme_file.read()
File "C:\tools\python3\lib\encodings\cp1252.py", line 23, in decode
return codecs.charmap_decode(input,self.errors,decoding_table)[0]
UnicodeDecodeError: 'charmap' codec can't decode byte 0x9d in position 1253: character maps to <undefined>
[end of output]
note: This error originates from a subprocess, and is likely not a problem with pip.
error: metadata-generation-failed
Γ Encountered error while generating package metadata.
β°β> See above for output.
note: This is an issue with the package mentioned above, not pip.
Anna Geller
Mickael ANDRIEU
07/26/2022, 9:47 AMfrom prefect import flow
from prefect_airbyte.connections import trigger_sync
from prefect_dbt.cli.commands import trigger_dbt_cli_command
@flow
def airbyte_dbt_flow():
airbyte_sync = trigger_sync(
airbyte_server_host="localhost",
airbyte_server_port=8000,
connection_id="828224ea-4052-41a5-85ba-7b5708214088",
poll_interval_s=3,
status_updates=True,
)
dbt_result = trigger_dbt_cli_command(
command="dbt run",
project_dir="C:\\Users\\micka\\Projects\\article_1\\",
wait_for=[airbyte_sync]
)
return dbt_result
if __name__ == "__main__":
airbyte_dbt_flow()
Anna Geller
Mickael ANDRIEU
07/26/2022, 12:15 PMAnna Geller
from prefect import flow, task, get_run_logger
from prefect_dbt.cli.commands import trigger_dbt_cli_command
@task
def airbyte_sync_fake():
logger = get_run_logger()
<http://logger.info|logger.info>("Running Airbyte sync...")
@flow
def trigger_dbt_cli_command_flow():
airb_task = airbyte_sync_fake()
result = trigger_dbt_cli_command(
"dbt debug", wait_for=[airb_task]
)
return result
if __name__ == "__main__":
trigger_dbt_cli_command_flow()
project_dir: The directory to search for the dbt_project.yml file.
Default is the current working directory and its parents.
Mickael ANDRIEU
07/26/2022, 1:05 PMβ― dbt run --profiles-dir C:\Users\micka\.dbt --project-dir C:\Users\micka\Projects\article_1
13:06:19 Running with dbt=1.0.4
13:06:19 [WARNING]: Configuration paths exist in your dbt_project.yml file which do not apply to any resources.
There are 1 unused configuration paths:
- models.marts.core
13:06:19 Found 3 models, 0 tests, 0 snapshots, 0 analyses, 433 macros, 0 operations, 0 seed files, 228 sources, 0 exposures, 0 metrics
13:06:19
13:06:20 Concurrency: 4 threads (target='dev')
13:06:20
13:06:20 1 of 3 START view model data_warehouse.stg_dema1n__users........................ [RUN]
13:06:20 2 of 3 START view model data_warehouse.stg_dema1n__volunteers................... [RUN]
13:06:20 3 of 3 START view model data_warehouse.stg_dema1n__youngs....................... [RUN]
13:06:21 2 of 3 OK created view model data_warehouse.stg_dema1n__volunteers.............. [OK in 0.97s]
13:06:21 3 of 3 OK created view model data_warehouse.stg_dema1n__youngs.................. [OK in 0.98s]
13:06:21 1 of 3 OK created view model data_warehouse.stg_dema1n__users................... [OK in 0.99s]
13:06:21
13:06:21 Finished running 3 view models in 1.65s.
13:06:21
13:06:21 Completed successfully
13:06:21
13:06:21 Done. PASS=3 WARN=0 ERROR=0 SKIP=0 TOTAL=3
Anna Geller
Mickael ANDRIEU
07/26/2022, 1:24 PMAnna Geller
Andrew Huang
07/26/2022, 4:08 PMimport asyncio
from prefect import flow
from prefect_airbyte.connections import trigger_sync
from prefect_dbt.cli.commands import trigger_dbt_cli_command
@flow
async def airbyte_dbt_flow():
airbyte_sync = await trigger_sync(
airbyte_server_host="localhost",
airbyte_server_port=8000,
connection_id="828224ea-4052-41a5-85ba-7b5708214088",
poll_interval_s=3,
status_updates=True,
)
dbt_result = await trigger_dbt_cli_command(
command="dbt run",
project_dir="C:\\Users\\micka\\Projects\\article_1\\",
wait_for=[airbyte_sync]
)
return dbt_result
if __name__ == "__main__":
asyncio.run(airbyte_dbt_flow())
Mickael ANDRIEU
07/26/2022, 4:30 PMAndrew Huang
07/26/2022, 4:46 PMdbt run --profiles-dir C:\Users\micka\.dbt --project-dir C:\Users\micka\Projects\article_1
manually in your command line, like
import subprocess
subprocess.run("dbt run --profiles-dir C:\Users\micka\.dbt --project-dir C:\Users\micka\Projects\article_1")
Also thanks for debugging with me!Mickael ANDRIEU
07/26/2022, 4:50 PMAndrew Huang
07/26/2022, 4:53 PMfrom prefect import flow, task
@task
def test():
subprocess.run("dbt run --profiles-dir C:\Users\micka\.dbt --project-dir C:\Users\micka\Projects\article_1")
@flow
def test_flow():
test()
test_flow()
from prefect import flow, task
from prefect_shell import shell_run_command
@flow
def test_flow():
shell_run_command("dbt run --profiles-dir C:\Users\micka\.dbt --project-dir C:\Users\micka\Projects\article_1")
import asyncio
from prefect import flow
from prefect_dbt.cli.commands import trigger_dbt_cli_command
@flow
async def airbyte_dbt_flow():
dbt_result = await trigger_dbt_cli_command(
command="dbt run",
project_dir="C:\\Users\\micka\\Projects\\article_1\\"
)
return dbt_result
if __name__ == "__main__":
asyncio.run(airbyte_dbt_flow())
wait_for
has an issueMickael ANDRIEU
07/26/2022, 4:57 PMAndrew Huang
07/26/2022, 4:58 PMMickael ANDRIEU
07/26/2022, 4:58 PMAndrew Huang
07/26/2022, 6:07 PMfrom prefect import flow, task
from prefect_shell import shell_run_command
@flow
def test_flow():
dbt_result = await trigger_dbt_cli_command(
command="dbt run",
project_dir="C:\\Users\\micka\\Projects\\article_1\\"
)
shell_run_command("pip list") # or any other valid Windows shell commands
import time
time.sleep(3)
import test_fail
raise ImportError
from prefect import flow, task
from prefect_shell.commands import shell_run_command
@task
async def mimic_dbt_fail():
await shell_run_command.fn("python test_fail.py")
@flow
def test_flow():
shell_run_command("sleep 3 && echo mimics_airbyte")
mimic_dbt_fail()
test_flow()
Mickael ANDRIEU
07/28/2022, 6:21 AMfrom prefect import flow
from prefect_airbyte.connections import trigger_sync
from prefect_dbt.cli.commands import trigger_dbt_cli_command
@flow
def airbyte_dbt_flow():
airbyte_sync = trigger_sync(
airbyte_server_host="localhost",
airbyte_server_port=8000,
connection_id="56ccaf30-5f07-49cb-8013-cebe91434137"
)
dbt_result = trigger_dbt_cli_command("dbt debug", wait_for=[airbyte_sync])
return dbt_result
if __name__ == "__main__":
airbyte_dbt_flow()
wait_for
) it works as expectedAndrew Huang
07/29/2022, 4:49 PMpip install git+<https://github.com/PrefectHQ/prefect@main>