lucian
07/09/2025, 3:29 AMMarvin
07/09/2025, 3:30 AMMarvin
07/09/2025, 3:30 AMpython
from prefect import flow, task
import asyncio
from typing import List
@task
async def async_task(number: int) -> int:
# Simulate some async work
await asyncio.sleep(1)
return number * 2
@flow
async def parallel_flow(numbers: List[int]):
# Create task coroutines
tasks = [async_task.submit(n) for n in numbers]
# Wait for all tasks to complete and get results
results = await asyncio.gather(*tasks)
print(f"Results: {results}")
return results
# Run the flow
if __name__ == "__main__":
asyncio.run(parallel_flow([1, 2, 3, 4, 5]))
Here's a more advanced example using TaskGroups for better structured concurrency:
python
from prefect import flow, task
import asyncio
from typing import List
from anyio import create_task_group
@task
async def async_task(number: int) -> int:
await asyncio.sleep(1)
return number * 2
@flow
async def parallel_flow_with_taskgroup(numbers: List[int]):
results = []
async def _run_task(n):
result = await async_task.submit(n)
results.append(result)
async with create_task_group() as tg:
for n in numbers:
tg.start_soon(_run_task, n)
print(f"Results: {sorted(results)}")
return results
# Run the flow
if __name__ == "__main__":
asyncio.run(parallel_flow_with_taskgroup([1, 2, 3, 4, 5]))
Key points about running parallel async tasks in Prefect:
1. Use the @task
decorator to define async tasks with async def
2. Create a flow using @flow
decorator that's also async
3. You can use either:
- asyncio.gather()
for simple parallel execution
- create_task_group()
from AnyIO for more structured concurrency (recommended)
4. Tasks submitted with .submit()
run concurrently
5. The flow will wait for all parallel tasks to complete before finishing
The TaskGroup approach (second example) is preferred because it:
- Provides better error handling
- Prevents resource leaks
- Ensures proper cleanup of tasks
- Follows structured concurrency principles
Remember that while tasks run in parallel, they're still subject to Python's async event loop and the GIL (Global Interpreter Lock). For CPU-bound tasks, you might want to consider using multiprocessing instead.
For more details about Prefect's async architecture, you can check out the blog post about how AnyIO powers Prefect's async architecture.lucian
07/09/2025, 3:38 AMMarvin
07/09/2025, 3:38 AMMarvin
07/09/2025, 3:39 AMpython
from prefect import flow, task
import asyncio
import time
from typing import List
# Synchronous task
@task
def sync_task(x: int) -> int:
time.sleep(1) # Simulating some CPU-bound work
return x * 2
# Asynchronous task
@task
async def async_task(x: int) -> int:
await asyncio.sleep(1) # Simulating some I/O-bound work
return x * 3
# Async flow that combines both types of tasks
@flow
async def mixed_flow(numbers: List[int]):
# Run sync tasks in parallel
sync_results = []
for n in numbers:
# .submit() works for both sync and async tasks
sync_results.append(sync_task.submit(n))
# Run async tasks in parallel
async_tasks = [async_task.submit(n) for n in numbers]
# Wait for all tasks to complete
# Note: Prefect handles the conversion of sync tasks to async automatically
all_results = await asyncio.gather(
*sync_results,
*async_tasks
)
# Split results (first half are sync results, second half are async)
sync_completed = all_results[:len(numbers)]
async_completed = all_results[len(numbers):]
print(f"Sync results: {sync_completed}")
print(f"Async results: {async_completed}")
return all_results
# Alternative: Synchronous flow with mixed tasks
@flow
def sync_mixed_flow(numbers: List[int]):
# This will work too! Prefect handles the async/sync conversion
sync_results = [sync_task.submit(n) for n in numbers]
async_results = [async_task.submit(n) for n in numbers]
# In a sync flow, you don't need asyncio.gather
# Prefect handles the waiting automatically
return sync_results + async_results
# Run the async flow
if __name__ == "__main__":
asyncio.run(mixed_flow([1, 2, 3]))
# Or run the sync flow
# sync_mixed_flow([1, 2, 3])
Key points about mixing async and sync tasks:
1. Task Definition Flexibility:
- You can freely mix async def
and regular def
tasks
- Prefect handles the conversion between sync and async execution automatically
2. Flow Level Choice:
- Your flow can be either async (async def
) or sync (def
)
- Both types of flows can handle both types of tasks
3. Task Submission:
- Use .submit()
for both async and sync tasks
- It returns a future-like object that can be awaited in async contexts
4. Best Practices:
- Use async tasks for I/O-bound operations (API calls, database operations)
- Use sync tasks for CPU-bound operations
- Group similar tasks together for better readability
- Consider using TaskGroups for more complex parallel execution patterns
5. Important Considerations:
- Sync tasks run in a thread pool to prevent blocking the event loop
- Async tasks run directly in the event loop
- All tasks are executed with proper concurrency management by Prefect
Here's an example using TaskGroups for more complex orchestration:
```python
from prefect import flow, task
from anyio import create_task_group
import asyncio
import time
@task
def sync_task(x: int) -> int:
time.sleep(1)
return x * 2
@task
async def async_task(x: int) -> int:
await asyncio.sleep(1)
return x * 3
@flow
async def complex_mixed_flow(numbers: List[int]):
results = {"sync": [], "async": []}
async def _run_sync(n):
result = await sync_task.submit(n)
results["sync"].append(result)
async def _run_async(n):
result = await async_task.submit(n)
results["async"].append(result)
async with create_task_group() as tg:
# Start all tasks
for n in numbers:
tg.start_soon(_run_sync, n)
tg.start_soon(_run_async, n)
print(f"Sync results: {sorted(results['sync'])}")
print(f"Async results: {sorted(results['async'])}")
return results
# Run the flow
if name == "__main__":Marvin
07/09/2025, 3:39 AMNate
07/09/2025, 2:54 PMMarvin
07/09/2025, 2:54 PMMarvin
07/09/2025, 2:54 PM.map()
method is indeed the simplest and most elegant way to handle concurrent execution for both sync and async tasks. Let me show the straightforward approach:
python
from prefect import flow, task
import asyncio
import time
@task
def sync_task(x: int) -> int:
time.sleep(1) # Simulating work
return x * 2
@task
async def async_task(x: int) -> int:
await asyncio.sleep(1) # Simulating async work
return x * 3
@flow
def mixed_concurrent_flow(numbers: List[int]):
# Run concurrent tasks using simple map
sync_results = sync_task.map(numbers)
async_results = async_task.map(numbers)
return sync_results, async_results
# Run the flow
if __name__ == "__main__":
numbers = [1, 2, 3, 4, 5]
sync_results, async_results = mixed_concurrent_flow(numbers)
print(f"Sync results: {sync_results}")
print(f"Async results: {async_results}")
That's much cleaner! The .map()
method handles all the concurrent execution details for you, whether the task is sync or async. Thanks for the correction - sometimes the simplest solution is the best one.