<@ULVA73B9P> How do I submit tasks using a concurr...
# ask-marvin
d
@Marvin How do I submit tasks using a concurrencytaskrunner and stream the results as they finish in any order? I notice that multiple results complete before I am processing the results of a first task
m
To submit tasks using the ConcurrencyTaskRunner and stream the results as they finish in any order, you can utilize the asynchronous functionality provided by Prefect. Here are the key steps and concepts: 1. Concurrency Task Runner: It enables concurrent (non-blocking) execution of tasks. Tasks will be submitted to a thread pool managed by
anyio
. 2. Submitting and Waiting for Tasks: - Use
submit()
to run tasks concurrently without blocking the flow. - Use
future.wait()
if you need to block until a specific task completes. 3. Streaming Results: To stream the results as tasks complete, use asynchronous task submission and result retrieval. Here's an example to illustrate:
Copy code
python
import time
import asyncio
from prefect import flow, task
from prefect.task_runners import ConcurrentTaskRunner

@task
def my_task(value: int):
    time.sleep(value)
    return value

@flow(task_runner=ConcurrentTaskRunner())
async def my_flow():
    futures = [my_task.submit(i) for i in range(5)]
    for future in futures:
        result = await future.result(fetch=True)
        print(f'Task with result {result} has completed.')

if __name__ == "__main__":
    asyncio.run(my_flow())
This example will submit multiple tasks to run concurrently and then print the results as each one completes. For more details, refer to the Prefect Concepts on Task Runners and results handling. You can also check out another code example on GitHub.
d
@Marvin the .submit() is already async, so why do I need to make my function itself async?
m
You're right, using
.submit()
already handles submitting tasks asynchronously. You don't need to make your function itself
async
. Here's the revised approach utilizing
.submit()
and waiting on task results:
Copy code
python
import time
from prefect import flow, task
from prefect.task_runners import ConcurrentTaskRunner

@task
def my_task(value: int):
    time.sleep(value)
    return value

@flow(task_runner=ConcurrentTaskRunner())
def my_flow():
    futures = [my_task.submit(i) for i in range(5)]
    for future in futures:
        result = future.wait().result()
        print(f'Task with result {result} has completed.')

if __name__ == "__main__":
    my_flow()
In this example: - The flow
my_flow
submits multiple tasks concurrently using
.submit()
. - It waits for each task to complete using
future.wait()
, which blocks until the task is finished. - Then, it retrieves the result using
future.result()
and prints it as tasks complete. This way, you can stream the results of submitted tasks as they finish without needing to make your flow function asynchronous.
d
@Marvin I don’t want to block until tasks are finished. I want you to return the output of tasks as they finish without waiting on any particular result
m
Understood. If you want to process the results as tasks finish without blocking on any particular result, you can use
anyio
or
asyncio
to handle the concurrency and await completion of the tasks independently. Here's a revised example using
anyio
to collect the results as they complete:
Copy code
python
import time
import anyio
from prefect import flow, task
from prefect.task_runners import ConcurrentTaskRunner

@task
def my_task(value: int):
    time.sleep(value)
    return value

@flow(task_runner=ConcurrentTaskRunner())
async def my_flow():
    futures = {future: future.result(fetch=True) for future in [my_task.submit(i) for i in range(5)]}
    
    for future in anyio.create_task_group().get_map(futures.values()):
        result = await future
        print(f'Task with result {result} has completed.')

if __name__ == "__main__":
    anyio.run(my_flow)
This example does the following: 1. Submits multiple tasks concurrently. 2. Uses
anyio.create_task_group()
to create a task group and to await on the results of each task independently. 3. Processes the results as soon as they are completed without blocking the flow execution. You can choose to use
asyncio
if you're more comfortable with it, but using
anyio
integrates more seamlessly with Prefect's concurrency patterns.