https://prefect.io logo
Title
e

Eric Mauser

03/28/2022, 8:50 PM
hey everyone, I'm trying to parellelize a flow that consists of several AirbyteConnectionTasks. Flow runs fine on the ECS Agent, but it runs each task in series. Is it possible to run the tasks in parallel when I'm generating them via a for loop, or is Prefect just running the tasks as they are generated? If so, how would this be done? Here is a toy code sample of what I'm working with
connections = ['conn1', 'conn2',
               'conn3']
               
with Flow("flow_name", run_config=RUN_CONFIG, storage=STORAGE, schedule=SCHEDULE) as flow:
    for conn_id in connections:
         flow.add_task( AirbyteConnectionTask(
            airbyte_server_host=<Airbyte host>
            airbyte_server_port=<airbyte port>,
            airbyte_api_version="v1",
            connection_id=conn_id
        )
         )
flow.run(executor=LocalDaskExecutor)
k

Kevin Kho

03/28/2022, 8:53 PM
You can run in parallel with mapping instead of the for loop:
airbyte = AirbyteConnectionTask(
       airbyte_server_host=<Airbyte host>
       airbyte_server_port=<airbyte port>,
       airbyte_api_version="v1")
with Flow("flow_name", run_config=RUN_CONFIG, storage=STORAGE, schedule=SCHEDULE) as flow:
    airbyte.map(connection_id=connections)
but I think this one should already work in parallel. Can you try:
flow.executor=LocalDaskExecutor()
flow.register()
I think the real issue is you didn’t attach the error to the Flow. Only during runtime
e

Eric Mauser

03/28/2022, 10:11 PM
Thanks for the tip @Kevin Kho. The bottom code snippet works. I'll try out the mapping as well