Demid
08/04/2025, 10:24 AM@task
def test(a):
b = a+a
@flow
async def test_flow:
for i in range(10):
test.submit(i)
Marvin
08/04/2025, 10:24 AMMarvin
08/04/2025, 10:25 AMpython
from prefect import flow, task
from prefect.futures import wait
@task
def test(a):
return a + a # Note: added return statement
@flow
async def test_flow():
# Submit all tasks and collect their futures
futures = []
for i in range(10):
futures.append(test.submit(i))
# Wait for all tasks to complete
results = wait(futures)
# Now you can work with the results
print("All tasks completed!")
return results
Key points about this solution:
1. Use wait()
from prefect.futures
to wait for all submitted tasks to complete
2. Store the futures (returned by .submit()
) in a list so you can wait for them all
3. The wait()
function returns the results of all tasks once they're complete
A few additional tips:
- By default, Prefect uses the ThreadPoolTaskRunner
for concurrent execution
- If you need more control over concurrency, you can specify the task runner explicitly:
python
from prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner
@flow(task_runner=ThreadPoolTaskRunner(max_workers=4))
async def test_flow():
# ... rest of the code
- Make sure your tasks are independent and thread-safe if they're running concurrently
- The wait()
function will block until all tasks complete, making it perfect for synchronization points in your flow
Let me know if you have any questions about task submission or concurrent execution in Prefect!