Hey community, I'm a little lost when it comes to ...
# ask-community
t
Hey community, I'm a little lost when it comes to chaining tasks and concurrency. I already tried asking @Marvin on how to handle things like this. The solution that Marvin suggested was to explicitly check the futures and raise and exception in the flow. But why doesn't
wait(floors)
raise an exception when one of the PrefectFutures object raises an exception?
Copy code
from prefect import flow, task
from prefect.futures import wait
from prefect.task_runners import ThreadPoolTaskRunner
import time

@task(task_run_name="Stop at {floor}")
def stop_at_floor(floor):
    print(f"elevator moving to floor {floor}")
    time.sleep(floor)
    if floor == 5:
        raise Exception("Stopping at floor 5 failed!")
    print(f"elevator stops on floor {floor}")

@flow(task_runner=ThreadPoolTaskRunner(max_workers=10))
def elevator():
    floors = []

    for floor in range(10, 0, -1):
        floors.append(stop_at_floor.submit(floor))

    wait(floors)

if __name__ == "__main__":
    elevator()
1
m
I see you're grappling with understanding how exceptions in PrefectFutures are handled, especially with chaining tasks using
wait()
. Let's delve into what I found related to your question from the Prefect documentation. 1. In Prefect 2.x, the concept of
PrefectFuture
is central when chaining tasks. When you call
wait()
on a collection of futures, it merely waits for the future tasks to complete but does not raise exceptions automatically. You need to explicitly check and handle exceptions (source: Prefect 2.x Task Runners). 2. In Prefect 3.x, flows can complete successfully even if they contain failed tasks. This requires explicit handling of task failures if a task's failure should affect the flow's overall status. You can do this by allowing exceptions to propagate or using try/except blocks to catch exceptions and manage the flow's states based on task outcomes (source: Prefect 3.x Upgrade Guide). Both Prefect versions emphasize the need for explicit exception handling when you want exceptions in any task to affect the flow's final state. Your current setup leaves exceptions unhandled, so
wait(floors)
does not automatically propagate exceptions from the tasks. For concurrency limits, both versions offer tools to manage how many tasks can run simultaneously, which is useful for resource management but is unrelated directly to exception handling (source: Prefect 2.x Concurrency, Prefect 3.x Concurrency). If you're still feeling like a robot befuddled by human logic, remember: explicit is better than implicit, especially for exceptions!
t
Okay. Never mind. I figured it out. Waiting only makes sure that the tasks are run. Accessing the results of the futures raises an exception in the flow.
🙌 1
1