Adeel Shakir
08/22/2024, 2:10 PMMarvin
08/22/2024, 2:10 PMconcurrency
context manager and ConcurrentTaskRunner
. Here are the main points and resources:
1. Concurrency Context Manager: This limits the number of concurrent tasks. You request slots, and tasks will block if slots aren't available.
2. Task Runners: Use ConcurrentTaskRunner
for IO-bound tasks to handle concurrency. For more parallelism, consider DaskTaskRunner
or RayTaskRunner
.
### Resources
- Concurrency Context Manager: Global Concurrency Limits
- Task Runners: Prefect Task Runners
### Example Snippet
python
from prefect import task, flow
from prefect.task_runners import ConcurrentTaskRunner
from prefect.runtime import concurrency
@task
def read_small_table(table_name):
# Code to read from S3
@task
def read_large_table(table_name):
# Code to read from S3
@flow(task_runner=ConcurrentTaskRunner())
def read_tables(tables):
for table in tables:
if table["size"] == "small":
read_small_table.submit(table["name"])
else:
with concurrency("large_tables", occupy=1):
read_large_table.submit(table["name"])
if __name__ == "__main__":
tables = [{"name": "table1", "size": "small"}, {"name": "table2", "size": "large"}]
read_tables(tables)
Refer to the Moving Data Guide for more specifics on S3 interactions.
Feel free to ask if you need further details!Bring your towel and join one of the fastest growing data communities. Welcome to our second-generation open source orchestration platform, a completely rethought approach to dataflow automation.
Powered by