https://prefect.io logo
m

Maity

08/07/2023, 1:09 AM
I'm getting a QueuePool limit of size 5 overflow 10 reached when calling .map with around 700 tasks. How do I handle this and prevent the overflow/timeout from occuring?
Copy code
# create an api request for each self link
api_results = api_get.map(map(lambda x: x["self"], employee_self_links))

@task(
    retries=3,
    retry_delay_seconds=exponential_backoff(backoff_factor=10),
)
def write(employee: Response):
    employee = employee.json()

    dest = f"raw/employees/details/{employee['id']}.json"
    write_to_file_system.fn(dest, employee)

# save the results to storage
storage_results = write.map(api_results)
is it normal/good practice to call .map in batches in order to avoid the error? i.e something like this:
Copy code
from prefect import unmapped

# Function to split a list into batches
def batch(lst, n):
    for i in range(0, len(lst), n):
        yield lst[i:i + n]

# Assuming employee_self_links is available
employee_links = list(map(lambda x: x["self"], employee_self_links))
api_results = []

# Process the batches of 10 links
for batch_links in batch(employee_links, 10):
    batch_results = api_get.map(batch_links)
    api_results.extend(batch_results)

@task(
    retries=3,
    retry_delay_seconds=exponential_backoff(backoff_factor=10),
)
def write(employee: Response):
    employee = employee.json()
    dest = f"raw/employees/details/{employee['id']}.json"
    write_to_file_system.fn(dest, employee)

# Save the results to storage
storage_results = write.map(api_results)
Any suggestions anyone?
m

Matt Klein

08/16/2023, 3:55 PM
This PR should address your issue, by allowing you to configure the size of the DB connection pool from its default (pool size 5 with overflow 10)
m

Maity

08/25/2023, 4:21 AM
I fixed this with batching, here is an example:
Copy code
@flow
def ingest_cost_center():
    # get all the cost centers
    cost_centers_listings = ingest_cost_center_listing()

    # extract the IDs from each cost center
    cost_center_ids = [
        cc["id"]
        for cc_response in cost_centers_listings
        for cc in cc_response["cost_centers"]
    ]

    # create the API paths for each cost center request
    cost_center_paths = [
        f"v2/companies/|{API_COMPANY}/config/cost-centers/{id}"
        for id in cost_center_ids
    ]

    results = []
    batch_size = 10

    # perform the remainder of tasks in batches
    while len(cost_center_paths) > 0:
        batch_cost_center_paths = cost_center_paths[:batch_size]
        cost_center_paths = cost_center_paths[batch_size:]

        # for each API path, execute the API
        cost_centers_responses = api_get.map(batch_cost_center_paths)

        writes = write_to_file_system.map(
            "raw/cost_centers/details/{id}.json",
            cost_centers_responses,
            # return_state to force await until the batch is done before doing the next batch
            return_state=True,
        )

        results += writes

    return results
2 Views