Tim Enders
08/30/2022, 1:56 PM_pickle.PicklingError: Pickling client objects is explicitly not supported.
Clients have non-trivial state that is local and unpickleable.
Pickle error on clients in Prefect 2.0?Anna Geller
Tim Enders
08/30/2022, 3:13 PMbqclient
) throws the pickling error.
def main(full_active_load=False, full_load=False):
logger = get_run_logger()
token = os.environ.get("PSH_API_TOKEN")
client = AccountsClient(
token,
token_url="REDACTED",
api_url="REDACTED",
client_id="REDACTED",
)
dest_dataset = os.environ.get("GCP_ACCOUNTS_DATASET")
query = f"select max(CAST(id AS INT64)) as last_id from `{dest_dataset}.subscriptions_orion` where api_source = 'accounts'"
highwater = query_highwater(query)
params = get_params(highwater, full_active_load, full_load)
pages_list = get_pages_list(client, "subscriptions", params)
items = get_items_list.map(
unmapped(client), unmapped("subscriptions"), pages_list[:100]
)
sub_data = []
for item in items:
sub_data.extend(item.result())
if not sub_data:
return Completed(message="No data from the API")
else:
transformed_data = transform_data(sub_data)
bqclient = bigquery.Client()
bq_result = load_df_bq(bqclient, transformed_data)
if isinstance(bq_result, LoadJob) and bq_result.state == "DONE":
return Completed(message="Load Finished!")
elif isinstance(bq_result, Failed):
return bq_result
else:
return Failed(message="Load Failure")
Anna Geller
Tim Enders
08/30/2022, 4:25 PMAnna Geller
Tim Enders
08/30/2022, 4:30 PM