Jiaqiang Li
03/29/2024, 3:13 PMPREFECT_TASK_INTROSPECTION_WARN_THRESHOLD
of 10.0. Try wrapping large task parameters with prefect.utilities.annotations.quote
for increased performance, e.g. my_task(quote(param))
. To disable this message set PREFECT_TASK_INTROSPECTION_WARN_THRESHOLD=0
.`
08:50:26 AM
upload_to_gcs-0
prefect.task_runs
INFO
Process for flow run 'incredible-vicugna' exited with status code: -9; This indicates that the process exited due to a SIGKILL signal. Typically, this is either caused by manual cancellation or high memory usage causing the operating system to terminate the process.
08:50:28 AM
prefect.flow_runs.runner
INFO
Reported flow run '28fc1c8a-b46f-41d6-98a9-6aa82a95150b' as crashed: Flow run process exited with non-zero status code -9.
Robin Niel
03/29/2024, 3:17 PMJiaqiang Li
03/29/2024, 3:19 PMRobin Niel
03/29/2024, 3:20 PMRobin Niel
03/29/2024, 3:21 PMJiaqiang Li
03/29/2024, 3:22 PMRobin Niel
03/29/2024, 3:23 PMRobin Niel
03/29/2024, 3:23 PMRobin Niel
03/29/2024, 3:24 PMJiaqiang Li
03/29/2024, 3:24 PM# Define the Python function to pull data from API
@task
def pull_data_from_api():
all_records = []
page = 1
while True:
params = {
"page": page,
"page_size": 5000
}
response = <http://requests.post|requests.post>(url, headers=headers, json=request_body, params=params)
if response.status_code == 200:
data = response.json()
records = data.get('conversions', [])
if not records:
break
all_records.extend(records)
page += 1
else:
print(f"Request failed with status code {response.status_code}")
break
return all_records
# Define the Python function to upload data to Google Cloud Storage
@task
def upload_to_gcs(all_records):
# Define the GCS bucket connection
credentials_json_string =(f'{gcs_service_account}').replace('\\\\n', '\\n').replace("'", '"')
credentials_json_object = json.loads(credentials_json_string)
credentials = service_account.Credentials.from_service_account_info(credentials_json_object)
storage_client = storage.Client(project=credentials_json_object['project_id'], credentials=credentials)
if all_records:
json_data = json.dumps(all_records)
data_json = '\n'.join(json.dumps(item) for item in all_records)
bucket = storage_client.get_bucket(f'{GCS_bucket_name}')
blob = bucket.blob(file_path)
blob.upload_from_string(data=data_json, content_type='application/json')
print("Data uploaded to the blob.")
Jiaqiang Li
03/29/2024, 3:25 PMRobin Niel
03/29/2024, 3:26 PMRobin Niel
03/29/2024, 3:26 PMJiaqiang Li
03/29/2024, 3:28 PMJiaqiang Li
03/29/2024, 3:29 PMjson_data = json.dumps(all_records)
data_json = '\n'.join(json.dumps(item) for item in all_records)
Robin Niel
03/29/2024, 3:29 PMJiaqiang Li
03/29/2024, 3:29 PMRobin Niel
03/29/2024, 3:29 PMRobin Niel
03/29/2024, 3:30 PMJiaqiang Li
03/29/2024, 3:30 PMRobin Niel
03/29/2024, 3:31 PMJiaqiang Li
03/29/2024, 3:31 PMJeff Hale
03/29/2024, 3:44 PMJiaqiang Li
03/29/2024, 3:45 PMJiaqiang Li
03/29/2024, 4:25 PMJiaqiang Li
03/29/2024, 4:25 PMRobin Niel
03/29/2024, 4:29 PMJiaqiang Li
03/29/2024, 4:29 PMJiaqiang Li
03/29/2024, 4:29 PM@task
def pull_data_from_api_to_gcs_and_bigquery():
all_records = []
page = 1
while True:
params = {
"page": page,
"page_size": 5000
}
response = <http://requests.post|requests.post>(url, headers=headers, json=request_body, params=params)
if response.status_code == 200:
data = response.json()
records = data.get('conversions', [])
if not records:
break
all_records.extend(records)
page += 1
else:
print(f"Request failed with status code {response.status_code}")
break
# return all_records
# Upload data to Google Cloud Storage
# Define the GCS bucket connection
credentials_json_string =(f'{gcs_service_account}').replace('\\\\n', '\\n').replace("'", '"')
credentials_json_object = json.loads(credentials_json_string)
credentials = service_account.Credentials.from_service_account_info(credentials_json_object)
storage_client = storage.Client(project=credentials_json_object['project_id'], credentials=credentials)
if all_records:
json_data = json.dumps(all_records)
data_json = '\n'.join(json.dumps(item) for item in all_records)
bucket = storage_client.get_bucket(f'{GCS_bucket_name}')
blob = bucket.blob(file_path)
blob.upload_from_string(data=data_json, content_type='application/json')
print("Data uploaded to the blob.")
Jiaqiang Li
03/29/2024, 4:30 PM# Define the prefect flow
@flow
def everflow_clicks_conversions_api_to_gcp():
pull_data_from_api_to_gcs_and_bigquery()
print("Data uploaded to GCS and Bigquery")
if __name__ == "__main__":
everflow_clicks_conversions_api_to_gcp()
Robin Niel
03/29/2024, 4:33 PMJiaqiang Li
03/29/2024, 4:34 PMProcess for flow run 'small-crocodile' exited with status code: -9; This indicates that the process exited due to a SIGKILL signal. Typically, this is either caused by manual cancellation or high memory usage causing the operating system to terminate the process.
10:18:42 AM
prefect.flow_runs.runner
INFO
Reported flow run '34db97bf-dd2c-4fce-9555-584775deff25' as crashed: Flow run process exited with non-zero status code -9.
Jiaqiang Li
03/29/2024, 4:36 PMRobin Niel
03/29/2024, 4:48 PMJiaqiang Li
03/29/2024, 4:49 PMJiaqiang Li
03/29/2024, 4:51 PM