Hello!
I’m trying to setup dependent flows on ECS with Dask following the example at https://docs-v1.prefect.io/core/idioms/flow-to-flow.html
I expected my children flows to each run on a dedicated dask worker but they run on the same machine as the parent flow. (code in thread)
Anyone can point me to the proper approach?
✅ 1
a
Anna Geller
09/05/2022, 6:11 PM
this Dask executor would need to be applied to the child flow referenced in the create_flow_run task, currently you attach it to the parent
t
Tom Kaszemacher
09/05/2022, 6:34 PM
Thanks @Anna Geller. When I attach it to the child flow it starts as many flow and scheduler Fargate instances.
Tom Kaszemacher
09/05/2022, 6:36 PM
My intent is to start a parent flow on a Fargate instance, grab a list of inputs from there to size my cluster, and launch one child flow per input on an EC2 instance
Tom Kaszemacher
09/05/2022, 6:42 PM
I also update my pseudo-code to show that I’m trying to use an adapt on the cluster
a
Anna Geller
09/05/2022, 8:09 PM
I understand what you try to do now, but afaik Dask cloud provider doesn't work that way. You can submit tasks to Dask cluster running on Fargate but you can't allocate flow runs to separate worker nodes, it's not as simple because Dask scheduler is the part that makes decision where each function call gets submitted to. You can check Dask docs about that or ask in https://dask.discourse.group
Anna Geller
09/05/2022, 9:05 PM
can you move the code block to the thread? this helps us to keep the main channel cleaner
t
Tom Kaszemacher
09/05/2022, 9:10 PM
Copy code
parent.py
with Flow(
name='parent',
run_config=ECSRun(
image='MY_IMAGE',
labels=['my-label']
),
executor=DaskExecutor(cluster_class=lambda: ECSCluster(**{my_args}), adapt_kwargs={'minimum': 0})
) as parent:
input_list = get_my_inputs()
sub_flows = create_flow_run.map(project_name=unmapped('project'), flow_name=unmapped('child'), [{'input': input} for input in input_list])
wait_for_flow_run.map(sub_flows,
raise_final_state=unmapped(True),
stream_logs=unmapped(True))
Copy code
child.py
with Flow(name='child') as child:
input = Parameter('input')
result1 = task1(input)
result2 = task2(result1)
Bring your towel and join one of the fastest growing data communities. Welcome to our second-generation open source orchestration platform, a completely rethought approach to dataflow automation.