https://prefect.io logo
Title
n

Neil Natarajan

08/11/2022, 8:59 PM
in prefect 2.0 what is the best way to chain results between map tasks? Specifically, how should errors on specific iterations be handled if the remaining results are still desired? This seemed seemless in prefect 1.x but I am running into issues while migrating to 2.0
k

Khuyen Tran

08/11/2022, 9:21 PM
You could add
.submit()
to each task to make the tasks inside your flow run asynchronously: https://docs.prefect.io/concepts/task-runners/#using-a-task-runner
n

Neil Natarajan

08/11/2022, 9:26 PM
I see so task mapping in 2.0 is actually just synchronous ? And .submit is actually async now?
@prefect-admin
j

Jeff Hale

08/11/2022, 10:34 PM
Check out the examples of mapping here. Will submit to the task runner automatically.
:thank-you: 1
🙌 1
n

Nate

08/11/2022, 11:28 PM
@Neil Natarajan it should execute concurrently by default, for example:
@task
def square(x: int) -> int:
   return x**2

@flow
def my_flow():
   numbers = [i for i in range(10)]
   squared_numbers = square.map(numbers)

my_flow()
will give logs like
18:27:44.809 | INFO    | prefect.engine - Created flow run 'elegant-coua' for flow 'my-flow'
...
18:27:45.846 | INFO    | Task run 'square-2b9b4656-0' - Finished in state Completed()
18:27:45.904 | INFO    | Task run 'square-2b9b4656-2' - Finished in state Completed()
18:27:45.912 | INFO    | Task run 'square-2b9b4656-8' - Finished in state Completed()
18:27:45.915 | INFO    | Task run 'square-2b9b4656-5' - Finished in state Completed()
18:27:45.925 | INFO    | Task run 'square-2b9b4656-3' - Finished in state Completed()
18:27:45.936 | INFO    | Task run 'square-2b9b4656-6' - Finished in state Completed()
18:27:45.936 | INFO    | Task run 'square-2b9b4656-7' - Finished in state Completed()
18:27:45.957 | INFO    | Task run 'square-2b9b4656-4' - Finished in state Completed()
18:27:46.015 | INFO    | Task run 'square-2b9b4656-9' - Finished in state Completed()
18:27:46.140 | INFO    | Task run 'square-2b9b4656-1' - Finished in state Completed()
18:27:46.252 | INFO    | Flow run 'elegant-coua' - Finished in state Completed('All states completed.')
n

Neil Natarajan

08/12/2022, 3:28 PM
Thank you all for the help
Circling back to the original question if there are two map tasks that chain input like this example
@task(name="map_task")
def map_task(x, y):
    if x == 2:
        raise Exception
    return x + y


@task(name="map task 2")
def map_task_2(nr):
    print(f"Map Task 2: {time.time()}: {nr}")
    time.sleep(5)
    print(f"Map Task 2: {time.time()} : {nr}")


@flow(name="outer flow", task_runner=DaskTaskRunner())
def outer_flow(numbers, static_num):
    futures = map_task.map(x=numbers, y=[static_num] * len(numbers))
    map_task_2.map(nr=futures)

if __name__ == "__main__":
    workflow_result = outer_flow([1, 2, 3, 4, 5], 2)
In this example I still want the results for iterations of x = [1, 3, 4,5] but when I run this the entire workflow fails and none of the other iterations run
k

Khuyen Tran

08/12/2022, 3:33 PM
It is because
map_task_2.map(nr=futures)
is blocked by
futures = map_task.map(x=numbers, y=[static_num] * len(numbers))
. For example,
from prefect import flow, task
from time import sleep 

@task(name="Task 1")
def task_1():
    sleep(5)

@task(name="Task 2")
def task_2():
   sleep(2)


@flow
def my_flow():
    task_1()
    task_2()


if __name__ == "__main__":
    my_flow()
Output:
09:37:44.528 | INFO    | prefect.engine - Created flow run 'prehistoric-hyrax' for flow 'my-flow'
09:37:45.275 | INFO    | Flow run 'prehistoric-hyrax' - Created task run 'Task 1-1619c4ff-0' for task 'Task 1'
09:37:45.276 | INFO    | Flow run 'prehistoric-hyrax' - Executing 'Task 1-1619c4ff-0' immediately...
09:37:50.594 | INFO    | Task run 'Task 1-1619c4ff-0' - Finished in state Completed()
09:37:50.679 | INFO    | Flow run 'prehistoric-hyrax' - Created task run 'Task 2-49f2f021-0' for task 'Task 2'
09:37:50.680 | INFO    | Flow run 'prehistoric-hyrax' - Executing 'Task 2-49f2f021-0' immediately...
09:37:52.938 | INFO    | Task run 'Task 2-49f2f021-0' - Finished in state Completed()
09:37:53.041 | INFO    | Flow run 'prehistoric-hyrax' - Finished in state Completed('All states completed.')
Task 2 will wait for Task 1 to execute
But adding
submit
will make the task run concurrently:
from prefect import flow, task
from time import sleep 

@task(name="Task 1")
def task_1():
    sleep(5)

@task(name="Task 2")
def task_2():
   sleep(2)


@flow
def my_flow():
    task_1.submit()
    task_2.submit()


if __name__ == "__main__":
    my_flow()
09:38:35.746 | INFO    | prefect.engine - Created flow run 'amiable-marmot' for flow 'my-flow'
09:38:36.462 | INFO    | Flow run 'amiable-marmot' - Created task run 'Task 1-1619c4ff-0' for task 'Task 1'
09:38:36.463 | INFO    | Flow run 'amiable-marmot' - Submitted task run 'Task 1-1619c4ff-0' for execution.
09:38:36.603 | INFO    | Flow run 'amiable-marmot' - Created task run 'Task 2-49f2f021-0' for task 'Task 2'
09:38:36.603 | INFO    | Flow run 'amiable-marmot' - Submitted task run 'Task 2-49f2f021-0' for execution.
09:38:38.850 | INFO    | Task run 'Task 2-49f2f021-0' - Finished in state Completed()
09:38:41.711 | INFO    | Task run 'Task 1-1619c4ff-0' - Finished in state Completed()
09:38:41.810 | INFO    | Flow run 'amiable-marmot' - Finished in state Completed('All states completed.')
So to make sure your second task still runs when the first task fails, do something like this:
@task(name="map_task")
def map_task(x, y):
    if x == 2:
        raise Exception
    return x + y


@task(name="map task 2")
def map_task_2(nr):
    print(f"Map Task 2: {time.time()}: {nr}")
    time.sleep(5)
    print(f"Map Task 2: {time.time()} : {nr}")


@flow(name="outer flow")
def outer_flow(numbers, static_num):
    futures = [map_task.submit(x=num, y=static_num) for num in numbers]
    [map_task_2.submit(nr=num) for num in futures]

if __name__ == "__main__":
    workflow_result = outer_flow([1, 2, 3, 4, 5], 2)
n

Neil Natarajan

08/12/2022, 3:44 PM
Understood, that requires then a lot of code change when migrating from 1->2. If the same behavior could be added to the .map() function that would make the migration and experience smoother imo
Thank you for the help!
z

Zanie

08/12/2022, 6:53 PM
Hey Neil, can you open an enhancement request on GitHub? We should be able to find a way to support chained mapping with errors.
:thank-you: 2
n

Neil Natarajan

08/12/2022, 7:15 PM
Yes I can totally do that
Thank you
:thank-you: 1