Eric Mauser
03/28/2022, 8:50 PMconnections = ['conn1', 'conn2',
'conn3']
with Flow("flow_name", run_config=RUN_CONFIG, storage=STORAGE, schedule=SCHEDULE) as flow:
for conn_id in connections:
flow.add_task( AirbyteConnectionTask(
airbyte_server_host=<Airbyte host>
airbyte_server_port=<airbyte port>,
airbyte_api_version="v1",
connection_id=conn_id
)
)
flow.run(executor=LocalDaskExecutor)
Kevin Kho
airbyte = AirbyteConnectionTask(
airbyte_server_host=<Airbyte host>
airbyte_server_port=<airbyte port>,
airbyte_api_version="v1")
with Flow("flow_name", run_config=RUN_CONFIG, storage=STORAGE, schedule=SCHEDULE) as flow:
airbyte.map(connection_id=connections)
but I think this one should already work in parallel. Can you try:
flow.executor=LocalDaskExecutor()
flow.register()
I think the real issue is you didn’t attach the error to the Flow. Only during runtimeEric Mauser
03/28/2022, 10:11 PM