Derek
07/25/2024, 5:46 PMMarvin
07/25/2024, 5:46 PManyio
.
2. Submitting and Waiting for Tasks:
- Use submit()
to run tasks concurrently without blocking the flow.
- Use future.wait()
if you need to block until a specific task completes.
3. Streaming Results: To stream the results as tasks complete, use asynchronous task submission and result retrieval.
Here's an example to illustrate:
python
import time
import asyncio
from prefect import flow, task
from prefect.task_runners import ConcurrentTaskRunner
@task
def my_task(value: int):
time.sleep(value)
return value
@flow(task_runner=ConcurrentTaskRunner())
async def my_flow():
futures = [my_task.submit(i) for i in range(5)]
for future in futures:
result = await future.result(fetch=True)
print(f'Task with result {result} has completed.')
if __name__ == "__main__":
asyncio.run(my_flow())
This example will submit multiple tasks to run concurrently and then print the results as each one completes.
For more details, refer to the Prefect Concepts on Task Runners and results handling.
You can also check out another code example on GitHub.Derek
07/25/2024, 5:47 PMMarvin
07/25/2024, 5:48 PM.submit()
already handles submitting tasks asynchronously. You don't need to make your function itself async
.
Here's the revised approach utilizing .submit()
and waiting on task results:
python
import time
from prefect import flow, task
from prefect.task_runners import ConcurrentTaskRunner
@task
def my_task(value: int):
time.sleep(value)
return value
@flow(task_runner=ConcurrentTaskRunner())
def my_flow():
futures = [my_task.submit(i) for i in range(5)]
for future in futures:
result = future.wait().result()
print(f'Task with result {result} has completed.')
if __name__ == "__main__":
my_flow()
In this example:
- The flow my_flow
submits multiple tasks concurrently using .submit()
.
- It waits for each task to complete using future.wait()
, which blocks until the task is finished.
- Then, it retrieves the result using future.result()
and prints it as tasks complete.
This way, you can stream the results of submitted tasks as they finish without needing to make your flow function asynchronous.Derek
07/25/2024, 5:48 PMMarvin
07/25/2024, 5:48 PManyio
or asyncio
to handle the concurrency and await completion of the tasks independently.
Here's a revised example using anyio
to collect the results as they complete:
python
import time
import anyio
from prefect import flow, task
from prefect.task_runners import ConcurrentTaskRunner
@task
def my_task(value: int):
time.sleep(value)
return value
@flow(task_runner=ConcurrentTaskRunner())
async def my_flow():
futures = {future: future.result(fetch=True) for future in [my_task.submit(i) for i in range(5)]}
for future in anyio.create_task_group().get_map(futures.values()):
result = await future
print(f'Task with result {result} has completed.')
if __name__ == "__main__":
anyio.run(my_flow)
This example does the following:
1. Submits multiple tasks concurrently.
2. Uses anyio.create_task_group()
to create a task group and to await on the results of each task independently.
3. Processes the results as soon as they are completed without blocking the flow execution.
You can choose to use asyncio
if you're more comfortable with it, but using anyio
integrates more seamlessly with Prefect's concurrency patterns.