Jkop
03/08/2024, 10:28 AMRyan
03/09/2024, 11:35 AMJkop
03/10/2024, 9:29 PMimport pandas as pd
import httpx
from prefect import variables
from prefect import flow, task, get_run_logger
from prefect_gcp.bigquery import BigQueryWarehouse
from prefect.utilities.asyncutils import run_sync_in_worker_thread
bigquery_warehouse_block = BigQueryWarehouse.load("bigquery-dwh")
API_KEY = variables.get('api_key')
API_URL = variables.get('api_url')
headers = {
"Authorization": "Bearer " + API_KEY,
"Content-Type": "application/json"
}
table = 'orders'
schema = 'multi_vendor'
@task
async def fetch_new_data(API_URL, headers):
# Make the request to the API using httpx's async client
async with httpx.AsyncClient() as client:
response = await client.get(API_URL, headers=headers)
# Check if the request was successful
if response.status_code == 200:
# Parse the JSON response
data = response.json()
orders = data.get('orders', [])
return orders
else:
# Handle errors (e.g., by logging, raising an exception, etc.)
response.raise_for_status()
@task
def get_api_data(data_from_api) -> pd.DataFrame:
results = data_from_api
try:
mvm = []
for result in results:
for item in result['line_items']:
sales = {
'shopify_order_id': int(item['shopify_order_id']),
'order_name': int(result['order_name'].replace('#','')),
'fulfillment': str(result['fulfillment']),
'seller_id': int(item['seller_id']),
'product_name': str(item['product_name']),
'product_price': float(item['product_price']),
'total_quantity': int(item['total_quantity']),
'actual_shipping': float(item['actual_shipping']),
'country': str(result['country']),
'date_add': pd.to_datetime(item['date_add'], errors='coerce'),
'date_upd': pd.to_datetime(item['date_upd'], errors='coerce')
}
mvm.append(sales)
return pd.DataFrame(mvm)
except Exception as e:
print(f'Error processing API DataFrame: {e}')
@task
def prepare_data_for_bigquery(df: pd.DataFrame) -> pd.DataFrame:
try:
# Replace NaT with None
df['date_add'] = pd.to_datetime(df['date_add'].apply(lambda x: "1970-01-01 00:30:37" if pd.isna(x) else x), utc=True)
df['date_upd'] = pd.to_datetime(df['date_upd'].apply(lambda x: "1970-01-01 00:30:37" if pd.isna(x) else x), utc=True)
return df
except Exception as e:
print(f'Error replacing NaT with None: {e}')
@task
def load_to_bigquery_from_data_frame(schema, table, data_frame, bigquery_warehouse_block):
try:
# Assuming bigquery_warehouse_block is a context manager for BigQuery connection
with bigquery_warehouse_block as warehouse:
create_operation = f"""
CREATE TABLE IF NOT EXISTS {schema}.{table} (
shopify_order_id INT64,
order_name INT64,
fulfillment STRING,
seller_id INT64,
product_name STRING,
product_price FLOAT64,
total_quantity INT64,
actual_shipping FLOAT64,
country STRING,
date_add TIMESTAMP,
date_upd TIMESTAMP
);
"""
warehouse.execute(create_operation)
# Prepare and execute merge operation for each row in the DataFrame
for index, row in data_frame.iterrows():
merge_operation = f"""
MERGE INTO {schema}.{table} AS target
USING (SELECT %s AS shopify_order_id, %s AS product_name) AS source
ON target.shopify_order_id = source.shopify_order_id AND target.product_name = source.product_name
WHEN NOT MATCHED THEN
INSERT (shopify_order_id, order_name, fulfillment, seller_id, product_name,
product_price, total_quantity, actual_shipping, country, date_add, date_upd)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
"""
insert_values = (
row['shopify_order_id'], row['product_name'],
row['shopify_order_id'], row['order_name'], row['fulfillment'],
row['seller_id'], row['product_name'], row['product_price'],
row['total_quantity'], row['actual_shipping'], row['country'], row['date_add'], row['date_upd']
)
warehouse.execute(merge_operation, insert_values)
print(f'The {table} table has been successfully created in BigQuery and new records have been added')
except Exception as e:
print(f'Error creating and updating records in BigQuery: {e}')
@flow(retries=3, retry_delay_seconds=5, log_prints=True)
def orders_pipeline():
data_from_api = fetch_new_data(API_URL, headers)
data_frame = get_api_data(data_from_api)
print(f"\nThere are {data_frame.shape[0]} rows and {data_frame.shape[1]} columns in the Dataframe.")
prepared_data_frame = prepare_data_for_bigquery(data_frame)
load_to_bigquery_from_data_frame(schema, table, prepared_data_frame, bigquery_warehouse_block)
if __name__ == "__main__":
orders_pipeline()
# serve(
# name="mvm_orders_listing",
# cron="@daily",
# tags=["mvm"],
# description="MVM orders listing for Allster online sales.",
# version="mvm/deployments",
# )