An Hoang
09/08/2021, 6:49 PM#input : Have a list of X: [X1,X2, X3 ...etc]
#input: total_times = 1_000_000
#property: x.do_work(1_000_000) = 1000 * x.do_work(1000)
#output needed: [X1.do_work(1_000_000), X2.do_work(1_000_000) ..etc]
@task
def long_running_task(x, n_times):
result = x.do_work(n_times)
return result
I have hundreds of Dask workers and want to split this work into units of x.do_work(1000)
to get the output as fast as possible through maximizing parallelism. How should I write my mapping functions to achieve this?
Do I just generate a list of X_list = [x] *1000
for each X
and a list of iteration_list = [1000] * 1000
, then do long_running_task.map(x=flatten(X_list), n_times = flatten(iteration_list))
?Kevin Kho
flatten
and you probably want n_times=unmapped(1000)
instead of making a list.An Hoang
09/09/2021, 6:28 PMclass TestObj():
def multiply_by_two(self, num):
return num * 2
@task
def generate_test_obj(num_objects):
return {"objects":[TestObj() for i in range(num_objects)],
"int_list": [num_object for num_object in range(num_objects)]}
@task
def call_multiply_by_two(obj, num):
return obj.multiply_by_two(num)
with Flow("test_flow") as test_flow:
int_arr = [1, 4, 6, 10]
generate_test_obj_result = generate_test_obj.map(num_objects= int_arr)
result = call_multiply_by_two.map(obj = flatten(generate_test_obj_result["objects"]), num = flatten(generate_test_obj_result["int_list"]))
test_flow.visualize()
Here's the error I got:
[2021-09-09 14:26:39-0400] INFO - prefect.FlowRunner | Beginning Flow run for 'test_flow'
[2021-09-09 14:26:39-0400] INFO - prefect.TaskRunner | Task 'generate_test_obj': Starting task run...
[2021-09-09 14:26:39-0400] INFO - prefect.TaskRunner | Task 'generate_test_obj': Finished task run for task with final state: 'Mapped'
[2021-09-09 14:26:39-0400] INFO - prefect.TaskRunner | Task 'generate_test_obj[0]': Starting task run...
[2021-09-09 14:26:39-0400] INFO - prefect.TaskRunner | Task 'generate_test_obj[0]': Finished task run for task with final state: 'Success'
[2021-09-09 14:26:39-0400] INFO - prefect.TaskRunner | Task 'generate_test_obj[1]': Starting task run...
[2021-09-09 14:26:39-0400] INFO - prefect.TaskRunner | Task 'generate_test_obj[1]': Finished task run for task with final state: 'Success'
[2021-09-09 14:26:40-0400] INFO - prefect.TaskRunner | Task 'generate_test_obj[2]': Starting task run...
[2021-09-09 14:26:40-0400] INFO - prefect.TaskRunner | Task 'generate_test_obj[2]': Finished task run for task with final state: 'Success'
[2021-09-09 14:26:40-0400] INFO - prefect.TaskRunner | Task 'generate_test_obj[3]': Starting task run...
[2021-09-09 14:26:40-0400] INFO - prefect.TaskRunner | Task 'generate_test_obj[3]': Finished task run for task with final state: 'Success'
[2021-09-09 14:26:40-0400] INFO - prefect.TaskRunner | Task 'generate_test_obj['int_list']': Starting task run...
[2021-09-09 14:26:40-0400] ERROR - prefect.TaskRunner | Task "generate_test_obj['int_list']": Exception encountered during task execution!
Traceback (most recent call last):
File "/lab/corradin_biobank/FOR_AN/OVP/corradin_ovp_utils/.venv/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 859, in get_task_run_state
value = prefect.utilities.executors.run_task_with_timeout(
File "/lab/corradin_biobank/FOR_AN/OVP/corradin_ovp_utils/.venv/lib/python3.8/site-packages/prefect/utilities/executors.py", line 445, in run_task_with_timeout
return task.run(*args, **kwargs) # type: ignore
File "/lab/corradin_biobank/FOR_AN/OVP/corradin_ovp_utils/.venv/lib/python3.8/site-packages/prefect/utilities/tasks.py", line 441, in method
return run_method(self, *args, **kwargs)
File "/lab/corradin_biobank/FOR_AN/OVP/corradin_ovp_utils/.venv/lib/python3.8/site-packages/prefect/tasks/core/operators.py", line 38, in run
return task_result[key]
TypeError: list indices must be integers or slices, not str
[2021-09-09 14:26:40-0400] INFO - prefect.TaskRunner | Task 'generate_test_obj['int_list']': Finished task run for task with final state: 'Failed'
[2021-09-09 14:26:40-0400] INFO - prefect.TaskRunner | Task 'generate_test_obj['objects']': Starting task run...
[2021-09-09 14:26:40-0400] ERROR - prefect.TaskRunner | Task "generate_test_obj['objects']": Exception encountered during task execution!
Traceback (most recent call last):
File "/lab/corradin_biobank/FOR_AN/OVP/corradin_ovp_utils/.venv/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 859, in get_task_run_state
value = prefect.utilities.executors.run_task_with_timeout(
File "/lab/corradin_biobank/FOR_AN/OVP/corradin_ovp_utils/.venv/lib/python3.8/site-packages/prefect/utilities/executors.py", line 445, in run_task_with_timeout
return task.run(*args, **kwargs) # type: ignore
File "/lab/corradin_biobank/FOR_AN/OVP/corradin_ovp_utils/.venv/lib/python3.8/site-packages/prefect/utilities/tasks.py", line 441, in method
return run_method(self, *args, **kwargs)
File "/lab/corradin_biobank/FOR_AN/OVP/corradin_ovp_utils/.venv/lib/python3.8/site-packages/prefect/tasks/core/operators.py", line 38, in run
return task_result[key]
TypeError: list indices must be integers or slices, not str
[2021-09-09 14:26:40-0400] INFO - prefect.TaskRunner | Task 'generate_test_obj['objects']': Finished task run for task with final state: 'Failed'
[2021-09-09 14:26:40-0400] INFO - prefect.TaskRunner | Task 'call_multiply_by_two': Starting task run...
[2021-09-09 14:26:40-0400] INFO - prefect.TaskRunner | Task 'call_multiply_by_two': Finished task run for task with final state: 'Failed'
[2021-09-09 14:26:40-0400] INFO - prefect.FlowRunner | Flow run FAILED: some reference tasks failed.
<Failed: "Some reference tasks failed.">
Do you know why that didn't work?Kevin Kho
generate_test_obj_result["objects"]
. List access is run during build time, but there is no value yet because of deferred execution. You need a task that will pull out objects to defer the execution to when generate_test_obj_result
has a value.Kevin Kho
result = call_multiply_by_two.map(obj = unmapped(TestObj()), num = flatten(generate_test_obj_result["int_list"]))
where unmapped
is imported from prefect
Kevin Kho
An Hoang
09/09/2021, 6:35 PMflatten
needs access to that list during build time right?Kevin Kho
An Hoang
09/09/2021, 6:37 PMresult = call_multiply_by_two.map(obj = unmapped(TestObj()), num = flatten(generate_test_obj_result["int_list"]))
create multiple pointers of the same TestObj
? What happens if multiply_by_two
changes the object state? How does parallelism work in this caseAn Hoang
09/09/2021, 6:38 PMKevin Kho
TestObj()
. This link you gave is news to me. Let me try it 😆Kevin Kho
List[Dict]
An Hoang
09/09/2021, 7:01 PMDict[List]
or because of it in combination with flatten
?An Hoang
09/09/2021, 7:02 PMKevin Kho