Tim Enders
08/16/2022, 8:54 PMsqlalchemy.exc.TimeoutError: QueuePool limit of size 5 overflow 10 reached, connection timed out, timeout 30.00 (Background on this error at: <https://sqlalche.me/e/14/3o7r>)
Anna Geller
Anna Geller
Tim Enders
08/16/2022, 9:08 PMAnna Geller
Tim Enders
08/17/2022, 1:34 PMTim Enders
08/17/2022, 1:44 PM@flow(
name="Subscriptions Flow",
task_runner=DaskTaskRunner(
cluster_kwargs={"n_workers": 4, "threads_per_worker": 2}
),
)
def main():
logger = get_run_logger()
tstamp = str(datetime.today())
token = os.environ.get("PSH_API_TOKEN")
client = AccountsClient(
token,
token_url="<https://auth.api.platform.sh/oauth2/token>",
api_url="<http://api.platform.sh>",
client_id="platform-cli",
)
dest_dataset = os.environ.get("GCP_ACCOUNTS_DATASET")
query = f"select max(CAST(id AS INT64)) as last_id from `{dest_dataset}.subscriptions_orion` where api_source = 'accounts'"
highwater = query_highwater(query)
# return Failed(message="Subscriptions table not found!")
statuses = [
"active",
"provisioning",
"provisioning failure",
"suspended",
"deleted",
]
params = {
"filter[id][value][0]": highwater,
"filter[id][operator][0]": ">",
"all": 1,
"sort": "id",
"filter[status][operator][0]": "IN",
}
for key, status in enumerate(statuses):
params[f"filter[status][value][{key}]"] = status
pages_list = get_pages_list(client, "subscriptions", params)
items = get_items_list.map(
unmapped(client), unmapped("subscriptions"), pages_list
)
Tim Enders
08/17/2022, 4:49 PMTim Enders
08/17/2022, 5:16 PMAnna Geller
Anna Geller