Thread
#prefect-community
    a

    An Hoang

    1 year ago
    Maybe I'm overthinking this, but I need to accomplish the following:
    #input : Have a list of X: [X1,X2, X3 ...etc]
    #input: total_times = 1_000_000
    #property: x.do_work(1_000_000) = 1000 * x.do_work(1000)
    #output needed: [X1.do_work(1_000_000), X2.do_work(1_000_000) ..etc]
    
    @task
    def long_running_task(x, n_times):
        result = x.do_work(n_times)
        return result
    I have hundreds of Dask workers and want to split this work into units of
    x.do_work(1000)
    to get the output as fast as possible through maximizing parallelism. How should I write my mapping functions to achieve this? Do I just generate a list of
    X_list = [x] *1000
    for each
    X
    and a list of
    iteration_list = [1000] * 1000
    , then do
    long_running_task.map(x=flatten(X_list), n_times = flatten(iteration_list))
    ?
    Kevin Kho

    Kevin Kho

    1 year ago
    I think this is right if you use Prefect for it, except that you might not need
    flatten
    and you probably want
    n_times=unmapped(1000)
    instead of making a list.
    a

    An Hoang

    1 year ago
    @Kevin Kho so I tried something like the minimum example below:
    class TestObj():
        
        def multiply_by_two(self, num):
            return num * 2
        
    @task
    def generate_test_obj(num_objects):
        return {"objects":[TestObj() for i in range(num_objects)],
                "int_list": [num_object for num_object in range(num_objects)]}
    
    @task
    def call_multiply_by_two(obj, num):
        return obj.multiply_by_two(num)
    
    with Flow("test_flow") as test_flow:
        int_arr = [1, 4, 6, 10]
        generate_test_obj_result = generate_test_obj.map(num_objects= int_arr)
        result = call_multiply_by_two.map(obj = flatten(generate_test_obj_result["objects"]), num = flatten(generate_test_obj_result["int_list"]))
        
    test_flow.visualize()
    Here's the error I got:
    [2021-09-09 14:26:39-0400] INFO - prefect.FlowRunner | Beginning Flow run for 'test_flow'
    [2021-09-09 14:26:39-0400] INFO - prefect.TaskRunner | Task 'generate_test_obj': Starting task run...
    [2021-09-09 14:26:39-0400] INFO - prefect.TaskRunner | Task 'generate_test_obj': Finished task run for task with final state: 'Mapped'
    [2021-09-09 14:26:39-0400] INFO - prefect.TaskRunner | Task 'generate_test_obj[0]': Starting task run...
    [2021-09-09 14:26:39-0400] INFO - prefect.TaskRunner | Task 'generate_test_obj[0]': Finished task run for task with final state: 'Success'
    [2021-09-09 14:26:39-0400] INFO - prefect.TaskRunner | Task 'generate_test_obj[1]': Starting task run...
    [2021-09-09 14:26:39-0400] INFO - prefect.TaskRunner | Task 'generate_test_obj[1]': Finished task run for task with final state: 'Success'
    [2021-09-09 14:26:40-0400] INFO - prefect.TaskRunner | Task 'generate_test_obj[2]': Starting task run...
    [2021-09-09 14:26:40-0400] INFO - prefect.TaskRunner | Task 'generate_test_obj[2]': Finished task run for task with final state: 'Success'
    [2021-09-09 14:26:40-0400] INFO - prefect.TaskRunner | Task 'generate_test_obj[3]': Starting task run...
    [2021-09-09 14:26:40-0400] INFO - prefect.TaskRunner | Task 'generate_test_obj[3]': Finished task run for task with final state: 'Success'
    [2021-09-09 14:26:40-0400] INFO - prefect.TaskRunner | Task 'generate_test_obj['int_list']': Starting task run...
    [2021-09-09 14:26:40-0400] ERROR - prefect.TaskRunner | Task "generate_test_obj['int_list']": Exception encountered during task execution!
    Traceback (most recent call last):
      File "/lab/corradin_biobank/FOR_AN/OVP/corradin_ovp_utils/.venv/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 859, in get_task_run_state
        value = prefect.utilities.executors.run_task_with_timeout(
      File "/lab/corradin_biobank/FOR_AN/OVP/corradin_ovp_utils/.venv/lib/python3.8/site-packages/prefect/utilities/executors.py", line 445, in run_task_with_timeout
        return task.run(*args, **kwargs)  # type: ignore
      File "/lab/corradin_biobank/FOR_AN/OVP/corradin_ovp_utils/.venv/lib/python3.8/site-packages/prefect/utilities/tasks.py", line 441, in method
        return run_method(self, *args, **kwargs)
      File "/lab/corradin_biobank/FOR_AN/OVP/corradin_ovp_utils/.venv/lib/python3.8/site-packages/prefect/tasks/core/operators.py", line 38, in run
        return task_result[key]
    TypeError: list indices must be integers or slices, not str
    [2021-09-09 14:26:40-0400] INFO - prefect.TaskRunner | Task 'generate_test_obj['int_list']': Finished task run for task with final state: 'Failed'
    [2021-09-09 14:26:40-0400] INFO - prefect.TaskRunner | Task 'generate_test_obj['objects']': Starting task run...
    [2021-09-09 14:26:40-0400] ERROR - prefect.TaskRunner | Task "generate_test_obj['objects']": Exception encountered during task execution!
    Traceback (most recent call last):
      File "/lab/corradin_biobank/FOR_AN/OVP/corradin_ovp_utils/.venv/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 859, in get_task_run_state
        value = prefect.utilities.executors.run_task_with_timeout(
      File "/lab/corradin_biobank/FOR_AN/OVP/corradin_ovp_utils/.venv/lib/python3.8/site-packages/prefect/utilities/executors.py", line 445, in run_task_with_timeout
        return task.run(*args, **kwargs)  # type: ignore
      File "/lab/corradin_biobank/FOR_AN/OVP/corradin_ovp_utils/.venv/lib/python3.8/site-packages/prefect/utilities/tasks.py", line 441, in method
        return run_method(self, *args, **kwargs)
      File "/lab/corradin_biobank/FOR_AN/OVP/corradin_ovp_utils/.venv/lib/python3.8/site-packages/prefect/tasks/core/operators.py", line 38, in run
        return task_result[key]
    TypeError: list indices must be integers or slices, not str
    [2021-09-09 14:26:40-0400] INFO - prefect.TaskRunner | Task 'generate_test_obj['objects']': Finished task run for task with final state: 'Failed'
    [2021-09-09 14:26:40-0400] INFO - prefect.TaskRunner | Task 'call_multiply_by_two': Starting task run...
    [2021-09-09 14:26:40-0400] INFO - prefect.TaskRunner | Task 'call_multiply_by_two': Finished task run for task with final state: 'Failed'
    [2021-09-09 14:26:40-0400] INFO - prefect.FlowRunner | Flow run FAILED: some reference tasks failed.
    <Failed: "Some reference tasks failed.">
    Do you know why that didn't work?
    Kevin Kho

    Kevin Kho

    1 year ago
    I think this is because you can’t do
    generate_test_obj_result["objects"]
    . List access is run during build time, but there is no value yet because of deferred execution. You need a task that will pull out objects to defer the execution to when
    generate_test_obj_result
    has a value.
    Also I think you can do:
    result = call_multiply_by_two.map(obj = unmapped(TestObj()), num = flatten(generate_test_obj_result["int_list"]))
    where
    unmapped
    is imported from
    prefect
    See this
    a

    An Hoang

    1 year ago
    Ah thanks! I have used dict key access before and it worked, but it's a special case this time since the value in the key is a list and
    flatten
    needs access to that list during build time right?
    Kevin Kho

    Kevin Kho

    1 year ago
    It works if the dict is already defined during build time (not a task output)
    a

    An Hoang

    1 year ago
    would
    result = call_multiply_by_two.map(obj = unmapped(TestObj()), num = flatten(generate_test_obj_result["int_list"]))
    create multiple pointers of the same
    TestObj
    ? What happens if
    multiply_by_two
    changes the object state? How does parallelism work in this case
    it shows here that even when the dict is a task output, it still works, has that been changed?
    Kevin Kho

    Kevin Kho

    1 year ago
    Ah then yes in that case you need a list of different
    TestObj()
    . This link you gave is news to me. Let me try it 😆
    You are very right. My understanding has been wrong 🤯. Yes then that you can’t do this for mapping because the structure it’s a
    List[Dict]
    a

    An Hoang

    1 year ago
    haha no worries! Just because it's a
    Dict[List]
    or because of it in combination with
    flatten
    ?
    The docs in the link I sent can be clearer then 🙂
    Kevin Kho

    Kevin Kho

    1 year ago
    It doesn’t work even without flatten I believe