Khen Price
03/06/2022, 10:43 AMDo mapped tasks run concurrently? Or does concurrency only happen when a dask executor is used?
Thanks!Anna Geller
03/06/2022, 12:37 PMLocalExecutor
is used which runs all tasks sequentially even with mapped tasks.
To easily make your tasks run in parallel, you can attach a `LocalDaskExecutor`:
from prefect import Flow, task
from prefect.executors import LocalDaskExecutor
numbers = [1, 2, 3]
map_fn = task(lambda x: x + 1)
reduce_fn = task(lambda x: sum(x))
with Flow('Map Reduce', executor=LocalDaskExecutor()) as flow:
mapped_result = map_fn.map(numbers)
reduced_result = reduce_fn(mapped_result)
for more on Dask executors, check out https://discourse.prefect.io/t/how-can-i-configure-my-flow-to-run-with-dask/45Noam polak
03/06/2022, 1:35 PM#flow.py
exists_items = wait_list_creation.map(some_list)
used_upstream_data = use_upsteam_data(
input_data,
exists_items,
upstream_tasks=[exists_items],
)
#tasks.py
@task(max_retries=30, retry_delay=timedelta(minutes=1))
def wait_list_creation(item_id: str) -> bool:
item_status = try_create_item(item_id)
if item_status:
return item_id
raise Exception(f"item_id didn't create")
@task(trigger=any_successful)
def used_upstream_data(input_data, exists_items) -> dict:
res = do_something(input_data, exists_items)
return res
Anna Geller
03/06/2022, 3:02 PMuse_upstream_data
get called only when the upstream tasks finish successfully. But what'S missing is that you would need to run this downstream task as a mapped task so that it gets called only for the mapped child tasks that finished successfully.
from datetime import timedelta
from prefect import Flow, task
from prefect.executors import LocalDaskExecutor
from prefect.triggers import any_successful
from prefect.tasks.notifications import SlackTask
from random import random
@task(log_stdout=True, trigger=any_successful)
def print_mapped_output(x):
message = f"Final result: {x}"
print(message)
SlackTask(message=message).run()
return x
@task(
log_stdout=True, max_retries=2, retry_delay=timedelta(seconds=2),
)
def randomly_fail(x):
print(f"Got input: {x}")
nr = random()
print(f"Number deciding whether task fails or not: {nr}")
if nr > 0.5:
raise ValueError("Too big number!")
return x * 10
with Flow(
name="flow_with_tasks_failing_sometimes", executor=LocalDaskExecutor()
) as flow:
numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9]
results = randomly_fail.map(numbers)
print_mapped_output.map(results)
Here is how one flow run would look like in the UI when you register and run the flow:print_mapped_output
wasn't run for them and I didn't get a Slack message for those.from datetime import timedelta
from prefect import Flow, task
from prefect.executors import LocalDaskExecutor
from prefect.triggers import any_successful
from prefect.tasks.notifications import SlackTask
from random import random
@task(log_stdout=True, trigger=any_successful)
def print_mapped_output(x):
message = f"Final result: {x}"
print(message)
SlackTask(message=message).run()
return x
@task(
log_stdout=True, max_retries=2, retry_delay=timedelta(seconds=2),
)
def randomly_fail(x):
print(f"Got input: {x}")
nr = random()
print(f"Number deciding whether task fails or not: {nr}")
if nr > 0.5:
raise ValueError("Too big number!")
return x * 10
with Flow(
name="flow_with_tasks_failing_sometimes_running_final_task_once",
executor=LocalDaskExecutor(),
) as flow:
numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9]
results = randomly_fail.map(numbers)
print_mapped_output(results)
Result:Noam polak
03/08/2022, 10:04 AMKhen Price
03/22/2022, 1:36 PMAnna Geller
03/22/2022, 5:06 PM