Hey guys, I have a main flow that runs dependent f...
# ask-community
b
Hey guys, I have a main flow that runs dependent flows defined as attached in the file. All the flows are actually legacy Java pipeline running in containers using ECS Fargate, so task defined in this graph will trigger the job to run on ECS fargate with all dependencies packaged within each container. The execution on this main flow runs fine but it is serialized, the point of this is to run in parallel through the DAG as each task is an independent ECS task running. Is there something I am missing to execute the tasks/flows in parallel leveraging ECS?
k
Hey @Benjamin Bonhomme, are you using the
StartFlowRun
task or
create_flow_run
?
b
Hi Kevin, Thank you for reaching out. I am using
StartFlowRun
.
The main flow is based pretty much exactly as described here: https://docs.prefect.io/core/idioms/flow-to-flow.html
a
Is this what you try to do?
Copy code
from prefect import Flow
from prefect.executors import LocalDaskExecutor
from prefect.tasks.prefect import StartFlowRun

start_flow_run = StartFlowRun(project_name="PROJECT_NAME", wait=True)


with Flow("extract_load_parallel", executor=LocalDaskExecutor()) as flow:
    mapped_flows = start_flow_run.map(
        flow_name=["d", "u", "b", "e", "c"],
    )
k
StartFlowRun
does have
wait=False
so it should just start it and then not worry about the result. Do you have
wait=True
?
But yes like Anna said. LocalExecutor does not parallelize. Maybe you want to use the LocalDaskExecutor
a
but we would have to wait for b and e to finish before starting s and j, correct? so then we can run d-u-b-e-c in parallel, and then wait, and then run s-j downstream
Copy code
from prefect import Flow
from prefect.executors import LocalDaskExecutor
from prefect.tasks.prefect import StartFlowRun

start_flow_run = StartFlowRun(project_name="PROJECT_NAME", wait=True)


with Flow("extract_load_parallel", executor=LocalDaskExecutor()) as flow:
    mapped_flows = start_flow_run.map(flow_name=["d", "u", "b", "e", "c"],)
    mapped_flows_layer_2 = start_flow_run.map(flow_name=["s", "j"],)
    mapped_flows_layer_2.set_upstream(mapped_flows)

flow.visualize()
b
@Anna Geller, @Kevin Kho, Thank you for the tips, it looks like the LocalDaskExecutor does work, I need to investigate why because it is beside my comprehension of the executor's role using ECS & the execution of flows defined by tasks. And the impacts of increasing the number of workers as the LocalDaskExecutor is not one per se, just a construct to run parallel tasks within the main flow execution.
a
it is beside my comprehension
You can really think of it as using normal multithreading and multiprocessing in Python - that’s what this executor is doing under the hood. LMK if you have any more questions about it