Tim Enders
08/16/2022, 8:54 PMsqlalchemy.exc.TimeoutError: QueuePool limit of size 5 overflow 10 reached, connection timed out, timeout 30.00 (Background on this error at: <https://sqlalche.me/e/14/3o7r>)Anna Geller
Anna Geller
Tim Enders
08/16/2022, 9:08 PMAnna Geller
Tim Enders
08/17/2022, 1:34 PMTim Enders
08/17/2022, 1:44 PM@flow(
    name="Subscriptions Flow",
    task_runner=DaskTaskRunner(
        cluster_kwargs={"n_workers": 4, "threads_per_worker": 2}
    ),
)
def main():
    logger = get_run_logger()
    tstamp = str(datetime.today())
    token = os.environ.get("PSH_API_TOKEN")
    client = AccountsClient(
        token,
        token_url="<https://auth.api.platform.sh/oauth2/token>",
        api_url="<http://api.platform.sh>",
        client_id="platform-cli",
    )
    dest_dataset = os.environ.get("GCP_ACCOUNTS_DATASET")
    query = f"select max(CAST(id AS INT64)) as last_id from `{dest_dataset}.subscriptions_orion` where api_source = 'accounts'"
    highwater = query_highwater(query)
    # return Failed(message="Subscriptions table not found!")
    statuses = [
        "active",
        "provisioning",
        "provisioning failure",
        "suspended",
        "deleted",
    ]
    params = {
        "filter[id][value][0]": highwater,
        "filter[id][operator][0]": ">",
        "all": 1,
        "sort": "id",
        "filter[status][operator][0]": "IN",
    }
    for key, status in enumerate(statuses):
        params[f"filter[status][value][{key}]"] = status
    pages_list = get_pages_list(client, "subscriptions", params)
    items = get_items_list.map(
        unmapped(client), unmapped("subscriptions"), pages_list
    )Tim Enders
08/17/2022, 4:49 PMTim Enders
08/17/2022, 5:16 PMAnna Geller
Anna Geller