Does anyone know what is going on here? when getti...
# ask-community
j
Does anyone know what is going on here? when getting api responses from google-ads, I keep getting this error
Unexpected error: ReferenceError('weakly-referenced object no longer exists')
on prefect Run logs api res count is almost 2000, error no occurs in local env someone please help me..!
k
Hi @jaehoon! You don't see an error in the local environment? Are you using the api inside a task?
j
Unexpected error: ReferenceError('weakly-referenced object no longer exists')
Traceback (most recent call last): File "c:\users\krims\data-pipline\data-pipeline\venv\lib\site-packages\prefect\engine\runner.py", line 48, in inner new_state = method(self, state, *args, **kwargs) File "c:\users\krims\data-pipline\data-pipeline\venv\lib\site-packages\prefect\engine\task_runner.py", line 900, in get_task_run_state result = self.result.write(value, **formatting_kwargs) File "c:\users\krims\data-pipline\data-pipeline\venv\lib\site-packages\prefect\engine\results\local_result.py", line 116, in write value = self.serializer.serialize(new.value) File "c:\users\krims\data-pipline\data-pipeline\venv\lib\site-packages\prefect\engine\serializers.py", line 73, in serialize return cloudpickle.dumps(value) File "c:\users\krims\data-pipline\data-pipeline\venv\lib\site-packages\cloudpickle\cloudpickle_fast.py", line 73, in dumps cp.dump(obj) File "c:\users\krims\data-pipline\data-pipeline\venv\lib\site-packages\cloudpickle\cloudpickle_fast.py", line 563, in dump return Pickler.dump(self, obj) File "c:\users\krims\data-pipline\data-pipeline\venv\lib\site-packages\cloudpickle\cloudpickle_fast.py", line 652, in reducer_override elif isinstance(obj, types.FunctionType): ReferenceError: weakly-referenced object no longer exists
here is error message. yes i use api inside a task
k
It seems like you have a connection/cursor that is being overriden. Can you share your code for that task?
j
Copy code
def register_google_ads_ETL():
    with Flow("GoogleAds-ETL") as flow:
        account_secrets = get_all_account_secrets("GOOGLE_ADS")
        # Extract
        ad_managers = extract_googleads_ad_managers.map(account_secrets)
        ad_campaigns = extract_google_ad_campaigns.map(account_secrets)
        ad_groups = extract_google_ad_groups.map(account_secrets)
        ads = extract_google_ads.map(account_secrets)
        ad_daily_reports = extract_google_ad_daily_reports.map(account_secrets)

        # Load
        load_google_ad_datas.map(ad_managers, ad_campaigns, ad_groups, ads, ad_daily_reports)

    flow.register(project_name="pipeline")


@task
def get_all_account_secrets(service_name) -> list[AccountSecret]:
    with PsqlClient().session() as session:
        account_secrets = AccountRepo(session).get_all_account_secrets(service_name)
        session.close()
    return account_secrets

@task
def extract_googleads_ad_managers(account_secret: AccountSecret) -> list[AdManager]:
    ga = GoogleAdsClient(account_secret)
    return ga.get_ad_managers()


@task
def extract_google_ad_campaigns(account_secret: AccountSecret) -> list[AdCampaign]:
    ga = GoogleAdsClient(account_secret)
    ad_managers = ga.get_ad_managers()
    return ga.get_ad_campaigns(ad_managers)


@task
def extract_google_ad_groups(account_secret: AccountSecret) -> list[AdGroup]:
    ga = GoogleAdsClient(account_secret)
    ad_managers = ga.get_ad_managers()
    return ga.get_ad_groups(ad_managers)


@task
def extract_google_ads(account_secret: AccountSecret) -> list[Ad]:
    ga = GoogleAdsClient(account_secret)
    ad_managers = ga.get_ad_managers()
    return ga.get_ads(ad_managers)


@task
def extract_google_ad_daily_reports(account_secret: AccountSecret) -> list[AdDailyReport]:
    ga = GoogleAdsClient(account_secret)
    ad_managers = ga.get_ad_managers()
    return ga.get_ad_daily_reports(ad_managers)


@task
def load_google_ad_datas(ad_managers, ad_campaigns, ad_groups, ads, ad_daily_reports):
    <http://logger.info|logger.info>(ad_managers)
    <http://logger.info|logger.info>(ad_campaigns)
    <http://logger.info|logger.info>(ad_groups)
    <http://logger.info|logger.info>(ads)
    <http://logger.info|logger.info>(ad_daily_reports)

    with PsqlClient().session() as session:
        if ad_managers is None:
            <http://logger.info|logger.info>("Empty registered product options")
            return
        for ad_manager in ad_managers:
            session.merge(ad_manager)
        session.flush()
        <http://logger.info|logger.info>(f"fetched registered AdManager. 1st item: [{ad_managers[0].name}]")

        if ad_campaigns is None:
            <http://logger.info|logger.info>("Empty registered product options")
        for ad_campaign in ad_campaigns:
            session.merge(ad_campaign)
        session.flush()
        <http://logger.info|logger.info>(f"fetched registered AdCampaigns. 1st item: [{ad_campaigns[0].name}]")

        if ad_groups is None:
            <http://logger.info|logger.info>("Empty registered product options")
        for ad_group in ad_groups:
            session.merge(ad_group)
        session.flush()
        <http://logger.info|logger.info>(f"fetched registered AdGroups. 1st item: [{ad_groups[0].name}]")

        if ads is None:
            <http://logger.info|logger.info>("Empty registered product options")
        for ad in ads:
            session.merge(ad)
        session.flush()
        <http://logger.info|logger.info>(f"fetched registered Ads. 1st item: [{ads[0].name}]")

        if ad_daily_reports is None:
            <http://logger.info|logger.info>("Empty registered product options")
        try:
            # TODO: Set IGNORE option on duplicate
            session.bulk_save_objects(ad_daily_reports)
            session.commit()
        except:
            logger.error(traceback.format_exc())
            session.rollback()
        <http://logger.info|logger.info>(f"fetched registered AdDailyReports. 1st item: [{ad_daily_reports[0].name}]")


if __name__ == '__main__':
    register_google_ads_ETL()
ads = extract_google_ads.map(account_secrets)
error occur here i guess too many api generate error
Copy code
def get_ads(self, ad_managers):
    ads = []
    for ad_manager in ad_managers:
        ga_service = self.client.get_service("GoogleAdsService")
        query = """
                SELECT
                  ad_group_ad.ad.id,
                  ad_group_ad.ad.name,
                  ad_group_ad.ad.final_urls,
                  ad_group.id,
                  campaign.id
                FROM ad_group_ad
                ORDER BY ad_group_ad.ad.id"""

        response = ga_service.search_stream(customer_id=ad_manager.id, query=query)

        for batch in response:
            for row in batch.results:
                ads.append(Ad(
                    id=str(row.ad_group_ad.ad.id),
                    name=row.ad_group_ad.ad.name,
                    ad_campaign_id=row.campaign.id,
                    ad_group_id=row.ad_group.id,
                    landing_page=row.ad_group_ad.ad.final_urls
                ))
    return ads
k
Yes that seems to be the case. Maybe you can try limiting the workers of the DaskExecutor? Not sure this can be parallelized.
j
Thanks i will try it! thank you for helping 🙂
👍 1