https://prefect.io logo
Title
k

Khen Price

03/06/2022, 10:43 AM
Hello Prefect Community! A question about mapped tasks and concurrency - we are not sure what is supposed to happen.
Do mapped tasks run concurrently? Or does concurrency only happen when a dask executor is used?
Thanks!
@Noam polak
a

Anna Geller

03/06/2022, 12:37 PM
You're intuition is correct, mapped tasks only run your tasks in parallel if you attach one of the dask executors. By default the
LocalExecutor
is used which runs all tasks sequentially even with mapped tasks. To easily make your tasks run in parallel, you can attach a `LocalDaskExecutor`:
from prefect import Flow, task
from prefect.executors import LocalDaskExecutor

numbers = [1, 2, 3]
map_fn = task(lambda x: x + 1)
reduce_fn = task(lambda x: sum(x))

with Flow('Map Reduce', executor=LocalDaskExecutor()) as flow:
    mapped_result = map_fn.map(numbers)
    reduced_result = reduce_fn(mapped_result)
for more on Dask executors, check out https://discourse.prefect.io/t/how-can-i-configure-my-flow-to-run-with-dask/45
n

Noam polak

03/06/2022, 1:35 PM
That's great Thanks
We want to create a map function that retries N times and then proceed to the downstream task if "any_successful" Is it possible ?
#flow.py

exists_items = wait_list_creation.map(some_list)
used_upstream_data = use_upsteam_data(
        input_data,
        exists_items,
        upstream_tasks=[exists_items],
    )

#tasks.py

@task(max_retries=30, retry_delay=timedelta(minutes=1))
def wait_list_creation(item_id: str) -> bool:
    item_status = try_create_item(item_id)
    if item_status:
        return item_id
    raise Exception(f"item_id didn't create")

@task(trigger=any_successful)
def used_upstream_data(input_data, exists_items) -> dict:
    res = do_something(input_data, exists_items)
    return res
a

Anna Geller

03/06/2022, 3:02 PM
I believe what you try to accomplish is actually the default Prefect behavior because by default downstream tasks such as your task
use_upstream_data
get called only when the upstream tasks finish successfully. But what'S missing is that you would need to run this downstream task as a mapped task so that it gets called only for the mapped child tasks that finished successfully.
from datetime import timedelta
from prefect import Flow, task
from prefect.executors import LocalDaskExecutor
from prefect.triggers import any_successful
from prefect.tasks.notifications import SlackTask
from random import random


@task(log_stdout=True, trigger=any_successful)
def print_mapped_output(x):
    message = f"Final result: {x}"
    print(message)
    SlackTask(message=message).run()
    return x


@task(
    log_stdout=True, max_retries=2, retry_delay=timedelta(seconds=2),
)
def randomly_fail(x):
    print(f"Got input: {x}")
    nr = random()
    print(f"Number deciding whether task fails or not: {nr}")
    if nr > 0.5:
        raise ValueError("Too big number!")
    return x * 10


with Flow(
    name="flow_with_tasks_failing_sometimes", executor=LocalDaskExecutor()
) as flow:
    numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9]
    results = randomly_fail.map(numbers)
    print_mapped_output.map(results)
Here is how one flow run would look like in the UI when you register and run the flow:
and here is the Slack message that I got with successful results: you can see that numbers 3 (mapped index of the task is 2) and 9 (mapped index of the task is 7) didn't finish successfully even after all configured retries, that's why the downstream task
print_mapped_output
wasn't run for them and I didn't get a Slack message for those.
But if you wish to run the final task only once rather than for each mapped upstream task, you could do:
from datetime import timedelta
from prefect import Flow, task
from prefect.executors import LocalDaskExecutor
from prefect.triggers import any_successful
from prefect.tasks.notifications import SlackTask
from random import random


@task(log_stdout=True, trigger=any_successful)
def print_mapped_output(x):
    message = f"Final result: {x}"
    print(message)
    SlackTask(message=message).run()
    return x


@task(
    log_stdout=True, max_retries=2, retry_delay=timedelta(seconds=2),
)
def randomly_fail(x):
    print(f"Got input: {x}")
    nr = random()
    print(f"Number deciding whether task fails or not: {nr}")
    if nr > 0.5:
        raise ValueError("Too big number!")
    return x * 10


with Flow(
    name="flow_with_tasks_failing_sometimes_running_final_task_once",
    executor=LocalDaskExecutor(),
) as flow:
    numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9]
    results = randomly_fail.map(numbers)
    print_mapped_output(results)
Result:
n

Noam polak

03/08/2022, 10:04 AM
Hey Thanks for the detailed response , it helped a lot
🙌 1
k

Khen Price

03/22/2022, 1:36 PM
@Anna Geller suggested a change to the docs here https://github.com/PrefectHQ/prefect/pull/5587
🙌 1
🙏 1
a

Anna Geller

03/22/2022, 5:06 PM
thanks a lot!