Thread
#prefect-community
    Edvard Kristiansen

    Edvard Kristiansen

    6 months ago
    Airbyte Task Operator Naming I am attempting to set up a flow which triggers a number of Airbyte pipelines using the pre-made Prefect Airbyte task. The run is working as expected using these docs as guide(https://docs.airbyte.com/operator-guides/using-prefect-task). However, all the tasks are called AirbyteConnectionTask which makes it harder to troubleshoot errors etc. This is the code:
    from prefect import Flow, task
    from prefect.tasks.airbyte.airbyte import AirbyteConnectionTask
    from datetime import timedelta
    from prefect.schedules import Schedule
    from prefect.schedules.clocks import CronClock
    from prefect.executors import LocalDaskExecutor
    
    
    schedule = Schedule(clocks=[CronClock("0 0 4 * * *")])
    
    
    pipeline1 = AirbyteConnectionTask(
        airbyte_server_host="localhost",
        airbyte_server_port=8000,
        airbyte_api_version="v1",
        connection_id="xx-xx-xx")
    
    pipeline2 = AirbyteConnectionTask(
        airbyte_server_host="localhost",
        airbyte_server_port=8000,
        airbyte_api_version="v1",
        connection_id="xx-xx-xx")
    
    pipeline3 = AirbyteConnectionTask(
        airbyte_server_host="localhost",
        airbyte_server_port=8000,
        airbyte_api_version="v1",
        connection_id="xx-xx-xx")
    
    pipeline4 = AirbyteConnectionTask(
        airbyte_server_host="localhost",
        airbyte_server_port=8000,
        airbyte_api_version="v1",
        connection_id="xx-xx-xx")
    
    
    
    with Flow("dwh Sync", schedule, executor=LocalDaskExecutor()) as flow:
        flow.add_task(pipeline1)
        flow.add_task(pipeline2)
        flow.add_task(pipeline3)
        flow.add_task(pipeline4)
    
    # Register the flow under the "airbyte" project
    flow.register(project_name="airbyte")
    I have already tried to use the @ task operator along with a function, but the task then finishes in a second without actually triggering the pipeline.
    This is what it looks like.
    alex

    alex

    6 months ago
    Hey @Edvard Kristiansen! You can pass
    name
    as a kwarg to each of those
    AirbyteConnectionTask
    instantiations to customize the name that displays in the Prefect Cloud UI.
    Edvard Kristiansen

    Edvard Kristiansen

    6 months ago
    AH! Thank you!
    Kevin Kho

    Kevin Kho

    6 months ago
    Hey @Edvard Kristiansen, I don’t think you need the
    add_task
    here too. You can just call them:
    with Flow("dwh Sync", schedule, executor=LocalDaskExecutor()) as flow:
        pipeline1()
        pipeline2()
        ...
    Edvard Kristiansen

    Edvard Kristiansen

    6 months ago
    @Kevin Kho Even better! Thanks for that!