lucian
07/09/2025, 3:29 AMMarvin
07/09/2025, 3:30 AMMarvin
07/09/2025, 3:30 AMpython
from prefect import flow, task
import asyncio
from typing import List
@task
async def async_task(number: int) -> int:
    # Simulate some async work
    await asyncio.sleep(1)
    return number * 2
@flow
async def parallel_flow(numbers: List[int]):
    # Create task coroutines
    tasks = [async_task.submit(n) for n in numbers]
    
    # Wait for all tasks to complete and get results
    results = await asyncio.gather(*tasks)
    print(f"Results: {results}")
    return results
# Run the flow
if __name__ == "__main__":
    asyncio.run(parallel_flow([1, 2, 3, 4, 5]))python
from prefect import flow, task
import asyncio
from typing import List
from anyio import create_task_group
@task
async def async_task(number: int) -> int:
    await asyncio.sleep(1)
    return number * 2
@flow
async def parallel_flow_with_taskgroup(numbers: List[int]):
    results = []
    
    async def _run_task(n):
        result = await async_task.submit(n)
        results.append(result)
    
    async with create_task_group() as tg:
        for n in numbers:
            tg.start_soon(_run_task, n)
    
    print(f"Results: {sorted(results)}")
    return results
# Run the flow
if __name__ == "__main__":
    asyncio.run(parallel_flow_with_taskgroup([1, 2, 3, 4, 5]))@taskasync def@flowasyncio.gather()create_task_group().submit()lucian
07/09/2025, 3:38 AMMarvin
07/09/2025, 3:38 AMMarvin
07/09/2025, 3:39 AMpython
from prefect import flow, task
import asyncio
import time
from typing import List
# Synchronous task
@task
def sync_task(x: int) -> int:
    time.sleep(1)  # Simulating some CPU-bound work
    return x * 2
# Asynchronous task
@task
async def async_task(x: int) -> int:
    await asyncio.sleep(1)  # Simulating some I/O-bound work
    return x * 3
# Async flow that combines both types of tasks
@flow
async def mixed_flow(numbers: List[int]):
    # Run sync tasks in parallel
    sync_results = []
    for n in numbers:
        # .submit() works for both sync and async tasks
        sync_results.append(sync_task.submit(n))
    
    # Run async tasks in parallel
    async_tasks = [async_task.submit(n) for n in numbers]
    
    # Wait for all tasks to complete
    # Note: Prefect handles the conversion of sync tasks to async automatically
    all_results = await asyncio.gather(
        *sync_results,
        *async_tasks
    )
    
    # Split results (first half are sync results, second half are async)
    sync_completed = all_results[:len(numbers)]
    async_completed = all_results[len(numbers):]
    
    print(f"Sync results: {sync_completed}")
    print(f"Async results: {async_completed}")
    return all_results
# Alternative: Synchronous flow with mixed tasks
@flow
def sync_mixed_flow(numbers: List[int]):
    # This will work too! Prefect handles the async/sync conversion
    sync_results = [sync_task.submit(n) for n in numbers]
    async_results = [async_task.submit(n) for n in numbers]
    
    # In a sync flow, you don't need asyncio.gather
    # Prefect handles the waiting automatically
    return sync_results + async_results
# Run the async flow
if __name__ == "__main__":
    asyncio.run(mixed_flow([1, 2, 3]))
    
    # Or run the sync flow
    # sync_mixed_flow([1, 2, 3])async defdefasync defdef.submit()Marvin
07/09/2025, 3:39 AMNate
07/09/2025, 2:54 PMMarvin
07/09/2025, 2:54 PMMarvin
07/09/2025, 2:54 PM.map()python
from prefect import flow, task
import asyncio
import time
@task
def sync_task(x: int) -> int:
    time.sleep(1)  # Simulating work
    return x * 2
@task
async def async_task(x: int) -> int:
    await asyncio.sleep(1)  # Simulating async work
    return x * 3
@flow
def mixed_concurrent_flow(numbers: List[int]):
    # Run concurrent tasks using simple map
    sync_results = sync_task.map(numbers)
    async_results = async_task.map(numbers)
    
    return sync_results, async_results
# Run the flow
if __name__ == "__main__":
    numbers = [1, 2, 3, 4, 5]
    sync_results, async_results = mixed_concurrent_flow(numbers)
    print(f"Sync results: {sync_results}")
    print(f"Async results: {async_results}").map()