VIPIN SOMA
08/20/2024, 1:55 PMMarvin
08/20/2024, 1:56 PMconcurrency
context manager. Here's an example for limiting the number of parallel tasks:
import asyncio
from prefect.concurrency.sync import concurrency
async def process_data(data):
print(f"Processing: {data}")
await asyncio.sleep(1)
return f"Processed: {data}"
async def main():
data_items = list(range(100))
processed_data = []
while data_items:
with concurrency("data-processing", occupy=5):
chunk = [data_items.pop() for _ in range(5)]
processed_data += await asyncio.gather(*[process_data(item) for item in chunk])
asyncio.run(main())
For more details, refer to the documentation on global-concurrency-limits.
2. Using Async Flows:
You can create and run asynchronous flows for concurrent execution. Here's a basic example:
import asyncio
from prefect import flow
async def some_async_function():
await asyncio.sleep(1)
return "I'm an async function"
@flow(log_prints=True)
async def my_async_workflow():
print("I can call any python code here, including prefect tasks or flows")
await some_async_function()
if __name__ == "__main__":
asyncio.run(my_async_workflow())
You can find this example here.
These methods will help you run Prefect flows in parallel effectively.