jaehoon
05/04/2021, 2:30 AMUnexpected error: ReferenceError('weakly-referenced object no longer exists')
on prefect Run logs
api res count is almost 2000, error no occurs in local env
someone please help me..!Kevin Kho
jaehoon
05/04/2021, 3:48 AMUnexpected error: ReferenceError('weakly-referenced object no longer exists')
Traceback (most recent call last):
File "c:\users\krims\data-pipline\data-pipeline\venv\lib\site-packages\prefect\engine\runner.py", line 48, in inner
new_state = method(self, state, *args, **kwargs)
File "c:\users\krims\data-pipline\data-pipeline\venv\lib\site-packages\prefect\engine\task_runner.py", line 900, in get_task_run_state
result = self.result.write(value, **formatting_kwargs)
File "c:\users\krims\data-pipline\data-pipeline\venv\lib\site-packages\prefect\engine\results\local_result.py", line 116, in write
value = self.serializer.serialize(new.value)
File "c:\users\krims\data-pipline\data-pipeline\venv\lib\site-packages\prefect\engine\serializers.py", line 73, in serialize
return cloudpickle.dumps(value)
File "c:\users\krims\data-pipline\data-pipeline\venv\lib\site-packages\cloudpickle\cloudpickle_fast.py", line 73, in dumps
cp.dump(obj)
File "c:\users\krims\data-pipline\data-pipeline\venv\lib\site-packages\cloudpickle\cloudpickle_fast.py", line 563, in dump
return Pickler.dump(self, obj)
File "c:\users\krims\data-pipline\data-pipeline\venv\lib\site-packages\cloudpickle\cloudpickle_fast.py", line 652, in reducer_override
elif isinstance(obj, types.FunctionType):
ReferenceError: weakly-referenced object no longer existsjaehoon
05/04/2021, 3:49 AMKevin Kho
jaehoon
05/04/2021, 5:29 AMdef register_google_ads_ETL():
with Flow("GoogleAds-ETL") as flow:
account_secrets = get_all_account_secrets("GOOGLE_ADS")
# Extract
ad_managers = extract_googleads_ad_managers.map(account_secrets)
ad_campaigns = extract_google_ad_campaigns.map(account_secrets)
ad_groups = extract_google_ad_groups.map(account_secrets)
ads = extract_google_ads.map(account_secrets)
ad_daily_reports = extract_google_ad_daily_reports.map(account_secrets)
# Load
load_google_ad_datas.map(ad_managers, ad_campaigns, ad_groups, ads, ad_daily_reports)
flow.register(project_name="pipeline")
@task
def get_all_account_secrets(service_name) -> list[AccountSecret]:
with PsqlClient().session() as session:
account_secrets = AccountRepo(session).get_all_account_secrets(service_name)
session.close()
return account_secrets
@task
def extract_googleads_ad_managers(account_secret: AccountSecret) -> list[AdManager]:
ga = GoogleAdsClient(account_secret)
return ga.get_ad_managers()
@task
def extract_google_ad_campaigns(account_secret: AccountSecret) -> list[AdCampaign]:
ga = GoogleAdsClient(account_secret)
ad_managers = ga.get_ad_managers()
return ga.get_ad_campaigns(ad_managers)
@task
def extract_google_ad_groups(account_secret: AccountSecret) -> list[AdGroup]:
ga = GoogleAdsClient(account_secret)
ad_managers = ga.get_ad_managers()
return ga.get_ad_groups(ad_managers)
@task
def extract_google_ads(account_secret: AccountSecret) -> list[Ad]:
ga = GoogleAdsClient(account_secret)
ad_managers = ga.get_ad_managers()
return ga.get_ads(ad_managers)
@task
def extract_google_ad_daily_reports(account_secret: AccountSecret) -> list[AdDailyReport]:
ga = GoogleAdsClient(account_secret)
ad_managers = ga.get_ad_managers()
return ga.get_ad_daily_reports(ad_managers)
@task
def load_google_ad_datas(ad_managers, ad_campaigns, ad_groups, ads, ad_daily_reports):
<http://logger.info|logger.info>(ad_managers)
<http://logger.info|logger.info>(ad_campaigns)
<http://logger.info|logger.info>(ad_groups)
<http://logger.info|logger.info>(ads)
<http://logger.info|logger.info>(ad_daily_reports)
with PsqlClient().session() as session:
if ad_managers is None:
<http://logger.info|logger.info>("Empty registered product options")
return
for ad_manager in ad_managers:
session.merge(ad_manager)
session.flush()
<http://logger.info|logger.info>(f"fetched registered AdManager. 1st item: [{ad_managers[0].name}]")
if ad_campaigns is None:
<http://logger.info|logger.info>("Empty registered product options")
for ad_campaign in ad_campaigns:
session.merge(ad_campaign)
session.flush()
<http://logger.info|logger.info>(f"fetched registered AdCampaigns. 1st item: [{ad_campaigns[0].name}]")
if ad_groups is None:
<http://logger.info|logger.info>("Empty registered product options")
for ad_group in ad_groups:
session.merge(ad_group)
session.flush()
<http://logger.info|logger.info>(f"fetched registered AdGroups. 1st item: [{ad_groups[0].name}]")
if ads is None:
<http://logger.info|logger.info>("Empty registered product options")
for ad in ads:
session.merge(ad)
session.flush()
<http://logger.info|logger.info>(f"fetched registered Ads. 1st item: [{ads[0].name}]")
if ad_daily_reports is None:
<http://logger.info|logger.info>("Empty registered product options")
try:
# TODO: Set IGNORE option on duplicate
session.bulk_save_objects(ad_daily_reports)
session.commit()
except:
logger.error(traceback.format_exc())
session.rollback()
<http://logger.info|logger.info>(f"fetched registered AdDailyReports. 1st item: [{ad_daily_reports[0].name}]")
if __name__ == '__main__':
register_google_ads_ETL()
jaehoon
05/04/2021, 5:31 AMads = extract_google_ads.map(account_secrets)
error occur here
i guess too many api generate errorjaehoon
05/04/2021, 5:32 AMdef get_ads(self, ad_managers):
ads = []
for ad_manager in ad_managers:
ga_service = self.client.get_service("GoogleAdsService")
query = """
SELECT
ad_group_ad.ad.id,
ad_group_ad.ad.name,
ad_group_ad.ad.final_urls,
ad_group.id,
campaign.id
FROM ad_group_ad
ORDER BY ad_group_ad.ad.id"""
response = ga_service.search_stream(customer_id=ad_manager.id, query=query)
for batch in response:
for row in batch.results:
ads.append(Ad(
id=str(row.ad_group_ad.ad.id),
name=row.ad_group_ad.ad.name,
ad_campaign_id=row.campaign.id,
ad_group_id=row.ad_group.id,
landing_page=row.ad_group_ad.ad.final_urls
))
return ads
Kevin Kho
jaehoon
05/05/2021, 1:51 AM