Bob Colner
05/11/2022, 9:45 PMTraceback (most recent call last):
File "/opt/miniconda3/envs/orion/lib/python3.9/site-packages/distributed/protocol/pickle.py", line 40, in dumps
result = pickle.dumps(x, **dump_kwargs)
_pickle.PicklingError: Can't pickle <function asa_get_campaigns at 0x1087c4160>: it's not the same object as __main__.asa_get_campaigns
@task(
retries=3,
retry_delay_seconds=60,
tags=['asa_api'],
)
def asa_get_campaigns(org_id: str, date: str, access_token: str) -> pd.DataFrame:
logger = get_run_logger()
<http://logger.info|logger.info>(f'fetching campaigns for: org: {org_id}, date: {date}')
return asa_api.get_campaigns(org_id, date, access_token)
asa.get_campaigns
def:
def get_campaigns(org_id: str, date: str, access_token: str) -> pd.DataFrame:
requestBody = {
'startTime': date,
'endTime': date,
'selector': {
'orderBy': [{'field': 'campaignId', 'sortOrder': 'ASCENDING'}],
'conditions': [{'field': 'campaignStatus', 'operator': 'EQUALS', 'values': ['ENABLED']}]
},
'returnRowTotals': 'true'
}
headers = {'Content-Type': 'application/json', 'Authorization': f'Bearer {access_token}', 'X-AP-Context': f'orgId={org_id}'}
response = <http://requests.post|requests.post>(CAMPAIGNS_URL, data=json.dumps(requestBody, ensure_ascii=False), headers=headers)
jsonResponse = response.json()
if jsonResponse["data"] == None or jsonResponse['pagination']['totalResults'] == 0:
return pd.DataFrame()
rows = jsonResponse['data']['reportingDataResponse']['row']
format_row = lambda row : {
'spend_date': date,
'org_id': org_id,
'app_name': row['metadata']['app']['appName'],
'campaign_id': row['metadata']['campaignId'],
'campaign_name': row['metadata']['campaignName'],
'modification_at': row['metadata']['modificationTime'],
'inserted_at': datetime.utcnow().isoformat(),
'spend_amount': float(row['total']['localSpend']['amount']),
'impressions': row['total']['impressions'],
'taps': row['total']['taps'],
'installs': row['total']['installs'],
'country': row['metadata']['countriesOrRegions'][0],
'json_payload': json.dumps(row)
}
formatted_rows = [*map(format_row, rows)]
df = pd.DataFrame(formatted_rows).convert_dtypes()
df['spend_amount'] = df['spend_amount'].astype(float)
return df
Zanie
05/11/2022, 10:33 PMif __name__ == "__main__"
guard before calling the flowBob Colner
05/11/2022, 10:40 PMZanie
05/11/2022, 10:42 PMBob Colner
05/11/2022, 10:44 PMZanie
05/11/2022, 10:45 PMBob Colner
05/11/2022, 10:47 PMZanie
05/11/2022, 10:49 PMBob Colner
05/11/2022, 10:51 PMZanie
05/12/2022, 3:38 PMimport time
from prefect import task, flow, get_run_logger
from prefect.task_runners import (
SequentialTaskRunner,
ConcurrentTaskRunner,
DaskTaskRunner,
)
from typing import List
from unittest.mock import MagicMock
pd = MagicMock()
GcpCredentials = MagicMock()
bigquery_insert_stream = MagicMock()
asa_api = MagicMock()
@task(
retries=3,
retry_delay_seconds=60,
tags=["asa_api"],
)
def asa_get_campaigns(org_id: str, date: str, access_token: str) -> pd.DataFrame:
logger = get_run_logger()
<http://logger.info|logger.info>(f"fetching campaigns for: org: {org_id}, date: {date}")
return asa_api.get_campaigns(org_id, date, access_token)
@task(
retries=3,
retry_delay_seconds=60,
tags=["asa_api"],
)
def asa_get_adgroups(
campaign_id: str, org_id: str, date: str, access_token: str
) -> pd.DataFrame:
logger = get_run_logger()
<http://logger.info|logger.info>(
f"fetching adgroups for: org: {org_id}, campaign: {campaign_id}, date: {date}"
)
return asa_api.get_adgroups(campaign_id, org_id, date, access_token)
@task(
retries=3,
retry_delay_seconds=60,
tags=["asa_api"],
)
def asa_get_keywords(
campaign_id: str, org_id: str, date: str, access_token: str
) -> pd.DataFrame:
logger = get_run_logger()
<http://logger.info|logger.info>(
f"fetching keywords for: {org_id}, campaign: {campaign_id}, date: {date}"
)
return asa_api.get_keywords(campaign_id, org_id, date, access_token)
dask_local = DaskTaskRunner(
cluster_kwargs={
"processes": True,
"n_workers": 4,
"threads_per_worker": 2,
"dashboard_address": None,
}
)
@flow(task_runner=dask_local)
def fetch_asa_data(
start_date: str, end_date: str, orgs: List[str] = ["117960", "2657930", "3737470"]
) -> str:
start_at = time.time()
logger = get_run_logger()
asa_token = asa_api.get_access_token()
gcp_credentials = GcpCredentials(project="emerald-skill-201716")
date_range = [
d.date().isoformat() for d in pd.date_range(start=start_date, end=end_date)
]
for date in date_range:
for org_id in orgs:
dfc_future = asa_get_campaigns(org_id, date, asa_token)
dfc = dfc_future.result()
if dfc is None or len(dfc) == 0:
<http://logger.info|logger.info>(f"No campaign data for org: {org_id}, date: {date}")
continue
rows_c = dfc.to_dict("records")
result = bigquery_insert_stream(
dataset="apple_search_ads",
table="campaigns_stream",
records=rows_c,
gcp_credentials=gcp_credentials,
)
for row in dfc.itertuples():
dfa_future = asa_get_adgroups(row.campaign_id, org_id, date, asa_token)
dfa = dfa_future.result()
if dfa is None or len(dfa) == 0:
<http://logger.info|logger.info>(
f"No adgroup data for org: {org_id}, date: {date}, campaign_id: {row.campaign_id}"
)
continue
rows_a = dfa.to_dict("records")
result = bigquery_insert_stream(
dataset="apple_search_ads",
table="adgroups_stream",
records=rows_a,
gcp_credentials=gcp_credentials,
)
dfk_future = asa_get_keywords(row.campaign_id, org_id, date, asa_token)
dfk = dfk_future.result()
if dfk is None or len(dfk) == 0:
<http://logger.info|logger.info>(
f"No keyword data for org: {org_id}, date: {date}, campaign_id: {row.campaign_id}"
)
continue
rows_k = dfk.to_dict("records")
result = bigquery_insert_stream(
dataset="apple_search_ads",
table="keywords_stream",
records=rows_k,
gcp_credentials=gcp_credentials,
)
end_at = time.time()
<http://logger.info|logger.info>(f"flow run time (mins): {(end_at-start_at)/60}")
return True
if __name__ == "__main__":
fetch_asa_data(start_date="2022-05-06", end_date="2022-05-06")
Bob Colner
05/12/2022, 4:34 PMZanie
05/12/2022, 4:38 PMBob Colner
05/23/2022, 2:09 PMZanie
05/23/2022, 3:38 PMBob Colner
05/23/2022, 3:40 PMZanie
05/23/2022, 3:41 PM