Getting an SQLAlchemy error when trying to paralle...
# ask-community
t
Getting an SQLAlchemy error when trying to parallelize about 3000 task runs on a Dask cluster. Confusing to me as this is something that "just worked" on Prefect 1.0, but isn't working for me on 2.0...
sqlalchemy.exc.TimeoutError: QueuePool limit of size 5 overflow 10 reached, connection timed out, timeout 30.00 (Background on this error at: <https://sqlalche.me/e/14/3o7r>)
t
I don't have async code written, I am just using Dask to parallelize syncronous code I am working on migrating from Prefect 1.0
a
gotcha, I think the timeout setting can still be worth exploring
t
I am struggling to understand how a DB timeout is the cause of my issues during task submission with the DaskTaskRunner
Here is the flow I am running:
Copy code
@flow(
    name="Subscriptions Flow",
    task_runner=DaskTaskRunner(
        cluster_kwargs={"n_workers": 4, "threads_per_worker": 2}
    ),
)
def main():
    logger = get_run_logger()
    tstamp = str(datetime.today())
    token = os.environ.get("PSH_API_TOKEN")
    client = AccountsClient(
        token,
        token_url="<https://auth.api.platform.sh/oauth2/token>",
        api_url="<http://api.platform.sh>",
        client_id="platform-cli",
    )

    dest_dataset = os.environ.get("GCP_ACCOUNTS_DATASET")
    query = f"select max(CAST(id AS INT64)) as last_id from `{dest_dataset}.subscriptions_orion` where api_source = 'accounts'"
    highwater = query_highwater(query)
    # return Failed(message="Subscriptions table not found!")

    statuses = [
        "active",
        "provisioning",
        "provisioning failure",
        "suspended",
        "deleted",
    ]

    params = {
        "filter[id][value][0]": highwater,
        "filter[id][operator][0]": ">",
        "all": 1,
        "sort": "id",
        "filter[status][operator][0]": "IN",
    }

    for key, status in enumerate(statuses):
        params[f"filter[status][value][{key}]"] = status

    pages_list = get_pages_list(client, "subscriptions", params)
    items = get_items_list.map(
        unmapped(client), unmapped("subscriptions"), pages_list
    )
So I have narrowed it down to the count of tasks I am trying to parallelize. If I only do 15 tasks it runs just fine. Trying to see what the limit is
Seems like 1500 is a nice round number where the timeout happens.
a
gotcha - I don't have any recommendations other than perhaps checking where the issue is, whether in a DB or Dask; if DB, is it your Postgres issue? are you running this on Cloud 2 or self-hosted/local?
posting in dask Discourse might be worth too