Hello Prefect Community! A question about mapped t...
# ask-community
k
Hello Prefect Community! A question about mapped tasks and concurrency - we are not sure what is supposed to happen.
Copy code
Do mapped tasks run concurrently? Or does concurrency only happen when a dask executor is used?
Thanks!
@Noam polak
a
You're intuition is correct, mapped tasks only run your tasks in parallel if you attach one of the dask executors. By default the
LocalExecutor
is used which runs all tasks sequentially even with mapped tasks. To easily make your tasks run in parallel, you can attach a `LocalDaskExecutor`:
Copy code
from prefect import Flow, task
from prefect.executors import LocalDaskExecutor

numbers = [1, 2, 3]
map_fn = task(lambda x: x + 1)
reduce_fn = task(lambda x: sum(x))

with Flow('Map Reduce', executor=LocalDaskExecutor()) as flow:
    mapped_result = map_fn.map(numbers)
    reduced_result = reduce_fn(mapped_result)
for more on Dask executors, check out https://discourse.prefect.io/t/how-can-i-configure-my-flow-to-run-with-dask/45
n
That's great Thanks
We want to create a map function that retries N times and then proceed to the downstream task if "any_successful" Is it possible ?
Copy code
#flow.py

exists_items = wait_list_creation.map(some_list)
used_upstream_data = use_upsteam_data(
        input_data,
        exists_items,
        upstream_tasks=[exists_items],
    )

#tasks.py

@task(max_retries=30, retry_delay=timedelta(minutes=1))
def wait_list_creation(item_id: str) -> bool:
    item_status = try_create_item(item_id)
    if item_status:
        return item_id
    raise Exception(f"item_id didn't create")

@task(trigger=any_successful)
def used_upstream_data(input_data, exists_items) -> dict:
    res = do_something(input_data, exists_items)
    return res
a
I believe what you try to accomplish is actually the default Prefect behavior because by default downstream tasks such as your task
use_upstream_data
get called only when the upstream tasks finish successfully. But what'S missing is that you would need to run this downstream task as a mapped task so that it gets called only for the mapped child tasks that finished successfully.
Copy code
from datetime import timedelta
from prefect import Flow, task
from prefect.executors import LocalDaskExecutor
from prefect.triggers import any_successful
from prefect.tasks.notifications import SlackTask
from random import random


@task(log_stdout=True, trigger=any_successful)
def print_mapped_output(x):
    message = f"Final result: {x}"
    print(message)
    SlackTask(message=message).run()
    return x


@task(
    log_stdout=True, max_retries=2, retry_delay=timedelta(seconds=2),
)
def randomly_fail(x):
    print(f"Got input: {x}")
    nr = random()
    print(f"Number deciding whether task fails or not: {nr}")
    if nr > 0.5:
        raise ValueError("Too big number!")
    return x * 10


with Flow(
    name="flow_with_tasks_failing_sometimes", executor=LocalDaskExecutor()
) as flow:
    numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9]
    results = randomly_fail.map(numbers)
    print_mapped_output.map(results)
Here is how one flow run would look like in the UI when you register and run the flow:
and here is the Slack message that I got with successful results: you can see that numbers 3 (mapped index of the task is 2) and 9 (mapped index of the task is 7) didn't finish successfully even after all configured retries, that's why the downstream task
print_mapped_output
wasn't run for them and I didn't get a Slack message for those.
But if you wish to run the final task only once rather than for each mapped upstream task, you could do:
Copy code
from datetime import timedelta
from prefect import Flow, task
from prefect.executors import LocalDaskExecutor
from prefect.triggers import any_successful
from prefect.tasks.notifications import SlackTask
from random import random


@task(log_stdout=True, trigger=any_successful)
def print_mapped_output(x):
    message = f"Final result: {x}"
    print(message)
    SlackTask(message=message).run()
    return x


@task(
    log_stdout=True, max_retries=2, retry_delay=timedelta(seconds=2),
)
def randomly_fail(x):
    print(f"Got input: {x}")
    nr = random()
    print(f"Number deciding whether task fails or not: {nr}")
    if nr > 0.5:
        raise ValueError("Too big number!")
    return x * 10


with Flow(
    name="flow_with_tasks_failing_sometimes_running_final_task_once",
    executor=LocalDaskExecutor(),
) as flow:
    numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9]
    results = randomly_fail.map(numbers)
    print_mapped_output(results)
Result:
n
Hey Thanks for the detailed response , it helped a lot
🙌 1
k
@Anna Geller suggested a change to the docs here https://github.com/PrefectHQ/prefect/pull/5587
🙌 1
🙏 1
a
thanks a lot!