<@ULVA73B9P> I am submitting hundreds of tasks to ...
# ask-marvin
d
@Marvin I am submitting hundreds of tasks to a ConcurrencyTaskRunner via task.submit(). I want to process the completed task results in the order that they complete without waiting for the unfinished tasks. How can I accomplish this
m
To process completed task results in the order that they finish without waiting for unfinished tasks when using a
ConcurrencyTaskRunner
, here is a general approach: 1. Submit Tasks and Retrieve Futures: Use
task.submit()
to submit your tasks and receive
futures
. 2. Retrieve States As They Complete: Use the
.wait()
method on the
futures
to get their states in the order they complete. Here's an example illustrating this approach:
Copy code
from prefect import flow, task
from prefect.task_runners import ConcurrentTaskRunner, SequentialTaskRunner
from concurrent.futures import as_completed

@task
def my_task(x):
    import time, random
    time.sleep(random.uniform(0.1, 0.5))
    return x

@flow(task_runner=ConcurrentTaskRunner())
def my_flow():
    futures = [my_task.submit(i) for i in range(10)]
    for future in as_completed(futures):
        state = future.wait()
        result = state.result()
        print(f"Task completed with result: {result}")

my_flow()
### Key Points: 1.
ConcurrentTaskRunner
ensures tasks run concurrently. 2.
as_completed
from the
concurrent.futures
module processes tasks in the order they complete. 3.
.wait()
method retrieves the state, which includes the result of the completed task. References: - Prefect Task Runners Documentation: https://docs.prefect.io/latest/concepts/task-runners/ - Prefect Task Example: https://raw.githubusercontent.com/zzstoatzz/prefect-code-examples/main/examples/tasks/basic.py This should allow you to handle tasks as they finish without waiting for all tasks to complete.
d
@Marvin Does this work with PrefectFutures
m
Yes, this approach works with
PrefectFutures
. You can use the
.wait()
method to get the state of each
PrefectFuture
as it completes. Here is an updated example specifically showing the usage with `PrefectFutures`:
Copy code
from prefect import flow, task
from prefect.task_runners import ConcurrentTaskRunner
from prefect.futures import PrefectFuture
from concurrent.futures import as_completed
import random
import time


@task
def my_task(x):
    time.sleep(random.uniform(0.1, 0.5))  # Simulate work
    return x


@flow(task_runner=ConcurrentTaskRunner())
def my_flow():
    futures = [my_task.submit(i) for i in range(10)]
    for future in as_completed(futures):
        state = future.wait()
        result = state.result()
        print(f"Task completed with result: {result}")


my_flow()
### Key Points: 1.
ConcurrentTaskRunner
ensures tasks run concurrently. 2.
as_completed
from the
concurrent.futures
module processes tasks in the order they complete. 3.
.wait()
method on
PrefectFutures
retrieves the state, including the result of the completed task. 4. The example supports concurrent execution and processes tasks in the order they complete. References: 1. Prefect Task Runners Documentation: - https://docs.prefect.io/latest/concepts/task-runners/ 2. Prefect Future Example: - https://raw.githubusercontent.com/zzstoatzz/prefect-code-examples/main/examples/flows/async_basic.py