https://prefect.io logo
#prefect-community
Title
# prefect-community
e

Eric Mauser

03/28/2022, 8:50 PM
hey everyone, I'm trying to parellelize a flow that consists of several AirbyteConnectionTasks. Flow runs fine on the ECS Agent, but it runs each task in series. Is it possible to run the tasks in parallel when I'm generating them via a for loop, or is Prefect just running the tasks as they are generated? If so, how would this be done? Here is a toy code sample of what I'm working with
Copy code
connections = ['conn1', 'conn2',
               'conn3']
               
with Flow("flow_name", run_config=RUN_CONFIG, storage=STORAGE, schedule=SCHEDULE) as flow:
    for conn_id in connections:
         flow.add_task( AirbyteConnectionTask(
            airbyte_server_host=<Airbyte host>
            airbyte_server_port=<airbyte port>,
            airbyte_api_version="v1",
            connection_id=conn_id
        )
         )
flow.run(executor=LocalDaskExecutor)
k

Kevin Kho

03/28/2022, 8:53 PM
You can run in parallel with mapping instead of the for loop:
Copy code
airbyte = AirbyteConnectionTask(
       airbyte_server_host=<Airbyte host>
       airbyte_server_port=<airbyte port>,
       airbyte_api_version="v1")
with Flow("flow_name", run_config=RUN_CONFIG, storage=STORAGE, schedule=SCHEDULE) as flow:
    airbyte.map(connection_id=connections)
but I think this one should already work in parallel. Can you try:
Copy code
flow.executor=LocalDaskExecutor()
flow.register()
I think the real issue is you didn’t attach the error to the Flow. Only during runtime
e

Eric Mauser

03/28/2022, 10:11 PM
Thanks for the tip @Kevin Kho. The bottom code snippet works. I'll try out the mapping as well
5 Views