https://prefect.io logo
Title
e

Edvard Kristiansen

02/28/2022, 7:26 PM
Airbyte Task Operator Naming I am attempting to set up a flow which triggers a number of Airbyte pipelines using the pre-made Prefect Airbyte task. The run is working as expected using these docs as guide(https://docs.airbyte.com/operator-guides/using-prefect-task). However, all the tasks are called AirbyteConnectionTask which makes it harder to troubleshoot errors etc. This is the code:
from prefect import Flow, task
from prefect.tasks.airbyte.airbyte import AirbyteConnectionTask
from datetime import timedelta
from prefect.schedules import Schedule
from prefect.schedules.clocks import CronClock
from prefect.executors import LocalDaskExecutor


schedule = Schedule(clocks=[CronClock("0 0 4 * * *")])


pipeline1 = AirbyteConnectionTask(
    airbyte_server_host="localhost",
    airbyte_server_port=8000,
    airbyte_api_version="v1",
    connection_id="xx-xx-xx")

pipeline2 = AirbyteConnectionTask(
    airbyte_server_host="localhost",
    airbyte_server_port=8000,
    airbyte_api_version="v1",
    connection_id="xx-xx-xx")

pipeline3 = AirbyteConnectionTask(
    airbyte_server_host="localhost",
    airbyte_server_port=8000,
    airbyte_api_version="v1",
    connection_id="xx-xx-xx")

pipeline4 = AirbyteConnectionTask(
    airbyte_server_host="localhost",
    airbyte_server_port=8000,
    airbyte_api_version="v1",
    connection_id="xx-xx-xx")



with Flow("dwh Sync", schedule, executor=LocalDaskExecutor()) as flow:
    flow.add_task(pipeline1)
    flow.add_task(pipeline2)
    flow.add_task(pipeline3)
    flow.add_task(pipeline4)

# Register the flow under the "airbyte" project
flow.register(project_name="airbyte")
I have already tried to use the @ task operator along with a function, but the task then finishes in a second without actually triggering the pipeline.
This is what it looks like.
a

alex

02/28/2022, 7:29 PM
Hey @Edvard Kristiansen! You can pass
name
as a kwarg to each of those
AirbyteConnectionTask
instantiations to customize the name that displays in the Prefect Cloud UI.
🙌 2
e

Edvard Kristiansen

02/28/2022, 7:30 PM
AH! Thank you!
k

Kevin Kho

02/28/2022, 8:37 PM
Hey @Edvard Kristiansen, I don’t think you need the
add_task
here too. You can just call them:
with Flow("dwh Sync", schedule, executor=LocalDaskExecutor()) as flow:
    pipeline1()
    pipeline2()
    ...
:upvote: 1
e

Edvard Kristiansen

02/28/2022, 8:54 PM
@Kevin Kho Even better! Thanks for that!