in prefect 2.0 what is the best way to chain results between map tasks? Specifically, how should err...
n
in prefect 2.0 what is the best way to chain results between map tasks? Specifically, how should errors on specific iterations be handled if the remaining results are still desired? This seemed seemless in prefect 1.x but I am running into issues while migrating to 2.0
k
You could add
.submit()
to each task to make the tasks inside your flow run asynchronously: https://docs.prefect.io/concepts/task-runners/#using-a-task-runner
n
I see so task mapping in 2.0 is actually just synchronous ? And .submit is actually async now?
@prefect-admin
j
Check out the examples of mapping here. Will submit to the task runner automatically.
🙏 1
🙌 1
n
@Neil Natarajan it should execute concurrently by default, for example:
Copy code
@task
def square(x: int) -> int:
   return x**2

@flow
def my_flow():
   numbers = [i for i in range(10)]
   squared_numbers = square.map(numbers)

my_flow()
will give logs like
Copy code
18:27:44.809 | INFO    | prefect.engine - Created flow run 'elegant-coua' for flow 'my-flow'
...
18:27:45.846 | INFO    | Task run 'square-2b9b4656-0' - Finished in state Completed()
18:27:45.904 | INFO    | Task run 'square-2b9b4656-2' - Finished in state Completed()
18:27:45.912 | INFO    | Task run 'square-2b9b4656-8' - Finished in state Completed()
18:27:45.915 | INFO    | Task run 'square-2b9b4656-5' - Finished in state Completed()
18:27:45.925 | INFO    | Task run 'square-2b9b4656-3' - Finished in state Completed()
18:27:45.936 | INFO    | Task run 'square-2b9b4656-6' - Finished in state Completed()
18:27:45.936 | INFO    | Task run 'square-2b9b4656-7' - Finished in state Completed()
18:27:45.957 | INFO    | Task run 'square-2b9b4656-4' - Finished in state Completed()
18:27:46.015 | INFO    | Task run 'square-2b9b4656-9' - Finished in state Completed()
18:27:46.140 | INFO    | Task run 'square-2b9b4656-1' - Finished in state Completed()
18:27:46.252 | INFO    | Flow run 'elegant-coua' - Finished in state Completed('All states completed.')
n
Thank you all for the help
Circling back to the original question if there are two map tasks that chain input like this example
Copy code
@task(name="map_task")
def map_task(x, y):
    if x == 2:
        raise Exception
    return x + y


@task(name="map task 2")
def map_task_2(nr):
    print(f"Map Task 2: {time.time()}: {nr}")
    time.sleep(5)
    print(f"Map Task 2: {time.time()} : {nr}")


@flow(name="outer flow", task_runner=DaskTaskRunner())
def outer_flow(numbers, static_num):
    futures = map_task.map(x=numbers, y=[static_num] * len(numbers))
    map_task_2.map(nr=futures)

if __name__ == "__main__":
    workflow_result = outer_flow([1, 2, 3, 4, 5], 2)
In this example I still want the results for iterations of x = [1, 3, 4,5] but when I run this the entire workflow fails and none of the other iterations run
k
It is because
map_task_2.map(nr=futures)
is blocked by
futures = map_task.map(x=numbers, y=[static_num] * len(numbers))
. For example,
Copy code
from prefect import flow, task
from time import sleep 

@task(name="Task 1")
def task_1():
    sleep(5)

@task(name="Task 2")
def task_2():
   sleep(2)


@flow
def my_flow():
    task_1()
    task_2()


if __name__ == "__main__":
    my_flow()
Output:
Copy code
09:37:44.528 | INFO    | prefect.engine - Created flow run 'prehistoric-hyrax' for flow 'my-flow'
09:37:45.275 | INFO    | Flow run 'prehistoric-hyrax' - Created task run 'Task 1-1619c4ff-0' for task 'Task 1'
09:37:45.276 | INFO    | Flow run 'prehistoric-hyrax' - Executing 'Task 1-1619c4ff-0' immediately...
09:37:50.594 | INFO    | Task run 'Task 1-1619c4ff-0' - Finished in state Completed()
09:37:50.679 | INFO    | Flow run 'prehistoric-hyrax' - Created task run 'Task 2-49f2f021-0' for task 'Task 2'
09:37:50.680 | INFO    | Flow run 'prehistoric-hyrax' - Executing 'Task 2-49f2f021-0' immediately...
09:37:52.938 | INFO    | Task run 'Task 2-49f2f021-0' - Finished in state Completed()
09:37:53.041 | INFO    | Flow run 'prehistoric-hyrax' - Finished in state Completed('All states completed.')
Task 2 will wait for Task 1 to execute
But adding
submit
will make the task run concurrently:
Copy code
from prefect import flow, task
from time import sleep 

@task(name="Task 1")
def task_1():
    sleep(5)

@task(name="Task 2")
def task_2():
   sleep(2)


@flow
def my_flow():
    task_1.submit()
    task_2.submit()


if __name__ == "__main__":
    my_flow()
Copy code
09:38:35.746 | INFO    | prefect.engine - Created flow run 'amiable-marmot' for flow 'my-flow'
09:38:36.462 | INFO    | Flow run 'amiable-marmot' - Created task run 'Task 1-1619c4ff-0' for task 'Task 1'
09:38:36.463 | INFO    | Flow run 'amiable-marmot' - Submitted task run 'Task 1-1619c4ff-0' for execution.
09:38:36.603 | INFO    | Flow run 'amiable-marmot' - Created task run 'Task 2-49f2f021-0' for task 'Task 2'
09:38:36.603 | INFO    | Flow run 'amiable-marmot' - Submitted task run 'Task 2-49f2f021-0' for execution.
09:38:38.850 | INFO    | Task run 'Task 2-49f2f021-0' - Finished in state Completed()
09:38:41.711 | INFO    | Task run 'Task 1-1619c4ff-0' - Finished in state Completed()
09:38:41.810 | INFO    | Flow run 'amiable-marmot' - Finished in state Completed('All states completed.')
So to make sure your second task still runs when the first task fails, do something like this:
Copy code
@task(name="map_task")
def map_task(x, y):
    if x == 2:
        raise Exception
    return x + y


@task(name="map task 2")
def map_task_2(nr):
    print(f"Map Task 2: {time.time()}: {nr}")
    time.sleep(5)
    print(f"Map Task 2: {time.time()} : {nr}")


@flow(name="outer flow")
def outer_flow(numbers, static_num):
    futures = [map_task.submit(x=num, y=static_num) for num in numbers]
    [map_task_2.submit(nr=num) for num in futures]

if __name__ == "__main__":
    workflow_result = outer_flow([1, 2, 3, 4, 5], 2)
n
Understood, that requires then a lot of code change when migrating from 1->2. If the same behavior could be added to the .map() function that would make the migration and experience smoother imo
Thank you for the help!
z
Hey Neil, can you open an enhancement request on GitHub? We should be able to find a way to support chained mapping with errors.
🙏 2
n
Yes I can totally do that
Thank you
🙏 1
504 Views