<@ULVA73B9P> I have pipeline in prefect to read da...
# marvin-in-the-wild
a
@Marvin I have pipeline in prefect to read data from s3 I am running one table at one time but now I want to run pipeline to run more than one table in parallel for small tables and for large tables only one task at once
m
To achieve parallel execution for small tables and sequential execution for large tables, you can use Prefect's
concurrency
context manager and
ConcurrentTaskRunner
. Here are the main points and resources: 1. Concurrency Context Manager: This limits the number of concurrent tasks. You request slots, and tasks will block if slots aren't available. 2. Task Runners: Use
ConcurrentTaskRunner
for IO-bound tasks to handle concurrency. For more parallelism, consider
DaskTaskRunner
or
RayTaskRunner
. ### Resources - Concurrency Context Manager: Global Concurrency Limits - Task Runners: Prefect Task Runners ### Example Snippet
Copy code
python
from prefect import task, flow
from prefect.task_runners import ConcurrentTaskRunner
from prefect.runtime import concurrency

@task
def read_small_table(table_name):
    # Code to read from S3

@task
def read_large_table(table_name):
    # Code to read from S3

@flow(task_runner=ConcurrentTaskRunner())
def read_tables(tables):
    for table in tables:
        if table["size"] == "small":
            read_small_table.submit(table["name"])
        else:
            with concurrency("large_tables", occupy=1):
                read_large_table.submit(table["name"])

if __name__ == "__main__":
    tables = [{"name": "table1", "size": "small"}, {"name": "table2", "size": "large"}]
    read_tables(tables)
Refer to the Moving Data Guide for more specifics on S3 interactions. Feel free to ask if you need further details!