Daniel Shinoda
01/19/2023, 3:57 AM@task
def query(client, customer_id):
current_datetime = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
json_file = f'query_results_{customer_id}_{current_datetime}.json'
with open(json_file, mode='w') as file:
for row in rows:
data = {
(...)
}
json.dump(data, file)
file.write('\n')
print(file.name,' created with ',len(rows),'rows')
return query_to_json
@task
def upload_to_gcs( file_to_send,bucket_name):
gcp_credentials_block = GcpCredentials.load("ds-projects")
cloud_storage_upload_blob_from_file(
file_to_send, bucket_name, "blob", gcp_credentials_block)
return f'has been uploaded to {bucket_name}'
@flow
def main(customer_id):
secret_block_refresh_token = Secret.load("g-ads-refresh-token")
secret_block_dev_token = Secret.load("g-ads-developer-token")
secret_block_client_secret = Secret.load("g-ads-client-secret")
credentials = {
XXXXXXXXX
}
googleads_client = GoogleAdsClient.load_from_dict(credentials)
file_to_send = query(googleads_client, customer_id)
bucket_name = "g_ads_geo_report_campaign"
upload_to_gcs.fn(file_to_send,bucket_name)
if __name__ == "__main__":
main("xxxxx")
Anna Geller
01/19/2023, 3:59 AMDaniel Shinoda
01/19/2023, 4:07 AM