Adeel Shakir
08/22/2024, 2:10 PMMarvin
08/22/2024, 2:10 PMconcurrency
context manager and ConcurrentTaskRunner
. Here are the main points and resources:
1. Concurrency Context Manager: This limits the number of concurrent tasks. You request slots, and tasks will block if slots aren't available.
2. Task Runners: Use ConcurrentTaskRunner
for IO-bound tasks to handle concurrency. For more parallelism, consider DaskTaskRunner
or RayTaskRunner
.
### Resources
- Concurrency Context Manager: Global Concurrency Limits
- Task Runners: Prefect Task Runners
### Example Snippet
python
from prefect import task, flow
from prefect.task_runners import ConcurrentTaskRunner
from prefect.runtime import concurrency
@task
def read_small_table(table_name):
# Code to read from S3
@task
def read_large_table(table_name):
# Code to read from S3
@flow(task_runner=ConcurrentTaskRunner())
def read_tables(tables):
for table in tables:
if table["size"] == "small":
read_small_table.submit(table["name"])
else:
with concurrency("large_tables", occupy=1):
read_large_table.submit(table["name"])
if __name__ == "__main__":
tables = [{"name": "table1", "size": "small"}, {"name": "table2", "size": "large"}]
read_tables(tables)
Refer to the Moving Data Guide for more specifics on S3 interactions.
Feel free to ask if you need further details!