Hi, I am having trouble using the map functionalit...
# ask-community
t
Hi, I am having trouble using the map functionality. When I use map not all tasks are being scheduled and after completing the scheduled tasks the flow hangs in a running state.
This code should reproduce the issue:
Copy code
import numpy as np
from prefect import flow, task
from prefect.tasks import task_input_hash
from prefect_dask import DaskTaskRunner


@task(cache_key_fn=task_input_hash)
def fun_1(n):
    return n ** 2


@task(cache_key_fn=task_input_hash)
def fun_2(n):
    return 2 * n + 1


@task(cache_key_fn=task_input_hash)
def fun_3(n, m):
    return n + m


@flow(name="map-bug",
      cache_result_in_memory=False,
      persist_result=True,
      result_storage="local-file-system/buchtimo-laptop-storage",
      task_runner=DaskTaskRunner())
def map_bug_flow():
    data = np.arange(200)

    f1 = fun_1.map(data)
    
    f2 = fun_2.map(f1)
    
    f3 = fun_3.map(data, f2)


if __name__ == "__main__":
    map_bug_flow()
If I replace the map calls with a for-loop and and submit like here:
Copy code
import numpy as np
from prefect import flow, task
from prefect.tasks import task_input_hash
from prefect_dask import DaskTaskRunner


@task(cache_key_fn=task_input_hash)
def fun_1(n):
    return n ** 2


@task(cache_key_fn=task_input_hash)
def fun_2(n):
    return 2 * n + 1


@task(cache_key_fn=task_input_hash)
def fun_3(n, m):
    return n + m


@flow(name="map-bug",
      cache_result_in_memory=False,
      persist_result=True,
      result_storage="local-file-system/buchtimo-laptop-storage",
      task_runner=DaskTaskRunner())
def map_bug_flow():
    data = np.arange(200)

    for d in data:
        f1 = fun_1.submit(d)

        f2 = fun_2.submit(f1)

        f3 = fun_3.submit(d, f2)


if __name__ == "__main__":
    map_bug_flow()
it works.
I would expect 600 tasks to be scheduled and executed. The version using map creates about 300 tasks. Some for f1, a bunch for f2 and a few for f3.
Am I using map in a wrong way or is this a bug?
(I deployed the flow and run it from cloud. Not sure if that matters.)
a
What error do you get? Can you try wrapping np.arange in a list()?
t
I don't get an error. But I don't get all results.
Expected results would be that fun_1 gets executed 200 times (once for each element), then fun_2 gets applied to the results of fun_1 and creates another 200 outputs. Then fun_3 is applied to the initial 200 elements and the results of fun_2. In total that should be 600 function/task calls. However, only a fraction is scheduled and executed.
Submitting the tasks via for-loop works and finishes quickly. With the map approach it hangs.
setting
data = list(range(200))
results in the same behaviour.
a
The main difference here is that with map all tasks are first prepared and then submitted at once. There should be no difference in the final result/outcome though. If you see one, feel free to submit an issue with a minimal reproducible example. Btw using a for loop is a totally viable approach, you don't need to use mapping necessarily
t
In case someone finds this on slack, the corresponding issue is here: https://github.com/PrefectHQ/prefect/issues/7934
gratitude thank you 1