Maybe I'm overthinking this, but I need to accompl...
# ask-community
a
Maybe I'm overthinking this, but I need to accomplish the following:
Copy code
#input : Have a list of X: [X1,X2, X3 ...etc]
#input: total_times = 1_000_000
#property: x.do_work(1_000_000) = 1000 * x.do_work(1000)
#output needed: [X1.do_work(1_000_000), X2.do_work(1_000_000) ..etc]

@task
def long_running_task(x, n_times):
    result = x.do_work(n_times)
    return result
I have hundreds of Dask workers and want to split this work into units of
x.do_work(1000)
to get the output as fast as possible through maximizing parallelism. How should I write my mapping functions to achieve this? Do I just generate a list of
X_list = [x] *1000
for each
X
and a list of
iteration_list = [1000] * 1000
, then do
long_running_task.map(x=flatten(X_list), n_times = flatten(iteration_list))
?
k
I think this is right if you use Prefect for it, except that you might not need
flatten
and you probably want
n_times=unmapped(1000)
instead of making a list.
a
@Kevin Kho so I tried something like the minimum example below:
Copy code
class TestObj():
    
    def multiply_by_two(self, num):
        return num * 2
    
@task
def generate_test_obj(num_objects):
    return {"objects":[TestObj() for i in range(num_objects)],
            "int_list": [num_object for num_object in range(num_objects)]}

@task
def call_multiply_by_two(obj, num):
    return obj.multiply_by_two(num)

with Flow("test_flow") as test_flow:
    int_arr = [1, 4, 6, 10]
    generate_test_obj_result = generate_test_obj.map(num_objects= int_arr)
    result = call_multiply_by_two.map(obj = flatten(generate_test_obj_result["objects"]), num = flatten(generate_test_obj_result["int_list"]))
    
test_flow.visualize()
Here's the error I got:
Copy code
[2021-09-09 14:26:39-0400] INFO - prefect.FlowRunner | Beginning Flow run for 'test_flow'
[2021-09-09 14:26:39-0400] INFO - prefect.TaskRunner | Task 'generate_test_obj': Starting task run...
[2021-09-09 14:26:39-0400] INFO - prefect.TaskRunner | Task 'generate_test_obj': Finished task run for task with final state: 'Mapped'
[2021-09-09 14:26:39-0400] INFO - prefect.TaskRunner | Task 'generate_test_obj[0]': Starting task run...
[2021-09-09 14:26:39-0400] INFO - prefect.TaskRunner | Task 'generate_test_obj[0]': Finished task run for task with final state: 'Success'
[2021-09-09 14:26:39-0400] INFO - prefect.TaskRunner | Task 'generate_test_obj[1]': Starting task run...
[2021-09-09 14:26:39-0400] INFO - prefect.TaskRunner | Task 'generate_test_obj[1]': Finished task run for task with final state: 'Success'
[2021-09-09 14:26:40-0400] INFO - prefect.TaskRunner | Task 'generate_test_obj[2]': Starting task run...
[2021-09-09 14:26:40-0400] INFO - prefect.TaskRunner | Task 'generate_test_obj[2]': Finished task run for task with final state: 'Success'
[2021-09-09 14:26:40-0400] INFO - prefect.TaskRunner | Task 'generate_test_obj[3]': Starting task run...
[2021-09-09 14:26:40-0400] INFO - prefect.TaskRunner | Task 'generate_test_obj[3]': Finished task run for task with final state: 'Success'
[2021-09-09 14:26:40-0400] INFO - prefect.TaskRunner | Task 'generate_test_obj['int_list']': Starting task run...
[2021-09-09 14:26:40-0400] ERROR - prefect.TaskRunner | Task "generate_test_obj['int_list']": Exception encountered during task execution!
Traceback (most recent call last):
  File "/lab/corradin_biobank/FOR_AN/OVP/corradin_ovp_utils/.venv/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 859, in get_task_run_state
    value = prefect.utilities.executors.run_task_with_timeout(
  File "/lab/corradin_biobank/FOR_AN/OVP/corradin_ovp_utils/.venv/lib/python3.8/site-packages/prefect/utilities/executors.py", line 445, in run_task_with_timeout
    return task.run(*args, **kwargs)  # type: ignore
  File "/lab/corradin_biobank/FOR_AN/OVP/corradin_ovp_utils/.venv/lib/python3.8/site-packages/prefect/utilities/tasks.py", line 441, in method
    return run_method(self, *args, **kwargs)
  File "/lab/corradin_biobank/FOR_AN/OVP/corradin_ovp_utils/.venv/lib/python3.8/site-packages/prefect/tasks/core/operators.py", line 38, in run
    return task_result[key]
TypeError: list indices must be integers or slices, not str
[2021-09-09 14:26:40-0400] INFO - prefect.TaskRunner | Task 'generate_test_obj['int_list']': Finished task run for task with final state: 'Failed'
[2021-09-09 14:26:40-0400] INFO - prefect.TaskRunner | Task 'generate_test_obj['objects']': Starting task run...
[2021-09-09 14:26:40-0400] ERROR - prefect.TaskRunner | Task "generate_test_obj['objects']": Exception encountered during task execution!
Traceback (most recent call last):
  File "/lab/corradin_biobank/FOR_AN/OVP/corradin_ovp_utils/.venv/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 859, in get_task_run_state
    value = prefect.utilities.executors.run_task_with_timeout(
  File "/lab/corradin_biobank/FOR_AN/OVP/corradin_ovp_utils/.venv/lib/python3.8/site-packages/prefect/utilities/executors.py", line 445, in run_task_with_timeout
    return task.run(*args, **kwargs)  # type: ignore
  File "/lab/corradin_biobank/FOR_AN/OVP/corradin_ovp_utils/.venv/lib/python3.8/site-packages/prefect/utilities/tasks.py", line 441, in method
    return run_method(self, *args, **kwargs)
  File "/lab/corradin_biobank/FOR_AN/OVP/corradin_ovp_utils/.venv/lib/python3.8/site-packages/prefect/tasks/core/operators.py", line 38, in run
    return task_result[key]
TypeError: list indices must be integers or slices, not str
[2021-09-09 14:26:40-0400] INFO - prefect.TaskRunner | Task 'generate_test_obj['objects']': Finished task run for task with final state: 'Failed'
[2021-09-09 14:26:40-0400] INFO - prefect.TaskRunner | Task 'call_multiply_by_two': Starting task run...
[2021-09-09 14:26:40-0400] INFO - prefect.TaskRunner | Task 'call_multiply_by_two': Finished task run for task with final state: 'Failed'
[2021-09-09 14:26:40-0400] INFO - prefect.FlowRunner | Flow run FAILED: some reference tasks failed.
<Failed: "Some reference tasks failed.">
Do you know why that didn't work?
k
I think this is because you can’t do
generate_test_obj_result["objects"]
. List access is run during build time, but there is no value yet because of deferred execution. You need a task that will pull out objects to defer the execution to when
generate_test_obj_result
has a value.
Also I think you can do:
Copy code
result = call_multiply_by_two.map(obj = unmapped(TestObj()), num = flatten(generate_test_obj_result["int_list"]))
where
unmapped
is imported from
prefect
See this
a
Ah thanks! I have used dict key access before and it worked, but it's a special case this time since the value in the key is a list and
flatten
needs access to that list during build time right?
k
It works if the dict is already defined during build time (not a task output)
a
would
result = call_multiply_by_two.map(obj = unmapped(TestObj()), num = flatten(generate_test_obj_result["int_list"]))
create multiple pointers of the same
TestObj
? What happens if
multiply_by_two
changes the object state? How does parallelism work in this case
it shows here that even when the dict is a task output, it still works, has that been changed?
k
Ah then yes in that case you need a list of different
TestObj()
. This link you gave is news to me. Let me try it 😆
You are very right. My understanding has been wrong 🤯. Yes then that you can’t do this for mapping because the structure it’s a
List[Dict]
a
haha no worries! Just because it's a
Dict[List]
or because of it in combination with
flatten
?
The docs in the link I sent can be clearer then 🙂
k
It doesn’t work even without flatten I believe