https://prefect.io logo
Title
t

Tom Kaszemacher

09/05/2022, 5:20 PM
Hello! I’m trying to setup dependent flows on ECS with Dask following the example at https://docs-v1.prefect.io/core/idioms/flow-to-flow.html I expected my children flows to each run on a dedicated dask worker but they run on the same machine as the parent flow. (code in thread) Anyone can point me to the proper approach?
1
a

Anna Geller

09/05/2022, 6:11 PM
this Dask executor would need to be applied to the child flow referenced in the create_flow_run task, currently you attach it to the parent
t

Tom Kaszemacher

09/05/2022, 6:34 PM
Thanks @Anna Geller. When I attach it to the child flow it starts as many flow and scheduler Fargate instances.
My intent is to start a parent flow on a Fargate instance, grab a list of inputs from there to size my cluster, and launch one child flow per input on an EC2 instance
I also update my pseudo-code to show that I’m trying to use an adapt on the cluster
a

Anna Geller

09/05/2022, 8:09 PM
I understand what you try to do now, but afaik Dask cloud provider doesn't work that way. You can submit tasks to Dask cluster running on Fargate but you can't allocate flow runs to separate worker nodes, it's not as simple because Dask scheduler is the part that makes decision where each function call gets submitted to. You can check Dask docs about that or ask in https://dask.discourse.group
can you move the code block to the thread? this helps us to keep the main channel cleaner
t

Tom Kaszemacher

09/05/2022, 9:10 PM
parent.py

with Flow(
        name='parent',
        run_config=ECSRun(
            image='MY_IMAGE',
            labels=['my-label']
        ),
        executor=DaskExecutor(cluster_class=lambda: ECSCluster(**{my_args}), adapt_kwargs={'minimum': 0})
        ) as parent:

    input_list = get_my_inputs()

    sub_flows = create_flow_run.map(project_name=unmapped('project'), flow_name=unmapped('child'), [{'input': input} for input in input_list])

    wait_for_flow_run.map(sub_flows,
                          raise_final_state=unmapped(True),
                          stream_logs=unmapped(True))
child.py

with Flow(name='child') as child:
    
    input = Parameter('input')
    
    result1 = task1(input)
    
    result2 = task2(result1)
🙌 1
:thank-you: 1