Edvard Kristiansen
02/28/2022, 7:26 PMfrom prefect import Flow, task
from prefect.tasks.airbyte.airbyte import AirbyteConnectionTask
from datetime import timedelta
from prefect.schedules import Schedule
from prefect.schedules.clocks import CronClock
from prefect.executors import LocalDaskExecutor
schedule = Schedule(clocks=[CronClock("0 0 4 * * *")])
pipeline1 = AirbyteConnectionTask(
airbyte_server_host="localhost",
airbyte_server_port=8000,
airbyte_api_version="v1",
connection_id="xx-xx-xx")
pipeline2 = AirbyteConnectionTask(
airbyte_server_host="localhost",
airbyte_server_port=8000,
airbyte_api_version="v1",
connection_id="xx-xx-xx")
pipeline3 = AirbyteConnectionTask(
airbyte_server_host="localhost",
airbyte_server_port=8000,
airbyte_api_version="v1",
connection_id="xx-xx-xx")
pipeline4 = AirbyteConnectionTask(
airbyte_server_host="localhost",
airbyte_server_port=8000,
airbyte_api_version="v1",
connection_id="xx-xx-xx")
with Flow("dwh Sync", schedule, executor=LocalDaskExecutor()) as flow:
flow.add_task(pipeline1)
flow.add_task(pipeline2)
flow.add_task(pipeline3)
flow.add_task(pipeline4)
# Register the flow under the "airbyte" project
flow.register(project_name="airbyte")
I have already tried to use the @ task operator along with a function, but the task then finishes in a second without actually triggering the pipeline.alex
02/28/2022, 7:29 PMname
as a kwarg to each of those AirbyteConnectionTask
instantiations to customize the name that displays in the Prefect Cloud UI.Edvard Kristiansen
02/28/2022, 7:30 PMKevin Kho
add_task
here too. You can just call them:
with Flow("dwh Sync", schedule, executor=LocalDaskExecutor()) as flow:
pipeline1()
pipeline2()
...
Edvard Kristiansen
02/28/2022, 8:54 PM