Please, I'm currently new using prefect and I'd to...
# ask-community
j
Please, I'm currently new using prefect and I'd to know what's the best way to automate my ETL work in the cloud? I'm finding it difficult to fully understand the official documentation
r
What have you done so far my friend? Its difficult to provide advice without knowing the full details/scope of your project. I suggest you come back with more than a sentence, perhaps code samples or an open repository you're working with and pose your question again
j
@Ryan thanks. Here's the code I'm currently working:
Copy code
import pandas as pd
import httpx
from prefect import variables
from prefect import flow, task, get_run_logger
from prefect_gcp.bigquery import BigQueryWarehouse
from prefect.utilities.asyncutils import run_sync_in_worker_thread
bigquery_warehouse_block = BigQueryWarehouse.load("bigquery-dwh")


API_KEY = variables.get('api_key')
API_URL = variables.get('api_url') 

headers = {
    "Authorization": "Bearer " + API_KEY,
    "Content-Type": "application/json"
}

table = 'orders'
schema = 'multi_vendor'


@task
async def fetch_new_data(API_URL, headers):
    # Make the request to the API using httpx's async client
    async with httpx.AsyncClient() as client:
        response = await client.get(API_URL, headers=headers)
        
        # Check if the request was successful
        if response.status_code == 200:
            # Parse the JSON response
            data = response.json()
            orders = data.get('orders', [])
            return orders
        else:
            # Handle errors (e.g., by logging, raising an exception, etc.)
            response.raise_for_status()

@task
def get_api_data(data_from_api) -> pd.DataFrame:
    results = data_from_api
    try:
        mvm = []
        for result in results:
            for item in result['line_items']:
                sales = {
                    'shopify_order_id': int(item['shopify_order_id']),
                    'order_name': int(result['order_name'].replace('#','')),
                    'fulfillment': str(result['fulfillment']),
                    'seller_id': int(item['seller_id']),
                    'product_name': str(item['product_name']),
                    'product_price': float(item['product_price']),
                    'total_quantity': int(item['total_quantity']),
                    'actual_shipping': float(item['actual_shipping']),
                    'country': str(result['country']),
                    'date_add': pd.to_datetime(item['date_add'], errors='coerce'),
                    'date_upd': pd.to_datetime(item['date_upd'], errors='coerce')
            }
            mvm.append(sales)
        return pd.DataFrame(mvm)
    except Exception as e:
        print(f'Error processing API DataFrame: {e}')

@task
def prepare_data_for_bigquery(df: pd.DataFrame) -> pd.DataFrame:
    try:
        # Replace NaT with None
        df['date_add'] = pd.to_datetime(df['date_add'].apply(lambda x: "1970-01-01 00:30:37" if pd.isna(x) else x), utc=True)
        df['date_upd'] = pd.to_datetime(df['date_upd'].apply(lambda x: "1970-01-01 00:30:37" if pd.isna(x) else x), utc=True)
        return df
    except Exception as e:
        print(f'Error replacing NaT with None: {e}')

@task
def load_to_bigquery_from_data_frame(schema, table, data_frame, bigquery_warehouse_block):
    try:
        # Assuming bigquery_warehouse_block is a context manager for BigQuery connection
        with bigquery_warehouse_block as warehouse:
            create_operation = f"""
                CREATE TABLE IF NOT EXISTS {schema}.{table} (
                    shopify_order_id INT64,
                    order_name INT64,
                    fulfillment STRING,
                    seller_id INT64,
                    product_name STRING,
                    product_price FLOAT64,
                    total_quantity INT64,
                    actual_shipping FLOAT64,
                    country STRING,
                    date_add TIMESTAMP,
                    date_upd TIMESTAMP
                );
            """

            warehouse.execute(create_operation)

            # Prepare and execute merge operation for each row in the DataFrame
            for index, row in data_frame.iterrows():
                merge_operation = f"""
                    MERGE INTO {schema}.{table} AS target
                    USING (SELECT %s AS shopify_order_id, %s AS product_name) AS source
                    ON target.shopify_order_id = source.shopify_order_id AND target.product_name = source.product_name
                    WHEN NOT MATCHED THEN 
                        INSERT (shopify_order_id, order_name, fulfillment, seller_id, product_name, 
                                product_price, total_quantity, actual_shipping, country, date_add, date_upd)
                        VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
                """
                insert_values = (
                    row['shopify_order_id'], row['product_name'],
                    row['shopify_order_id'], row['order_name'], row['fulfillment'],
                    row['seller_id'], row['product_name'], row['product_price'],
                    row['total_quantity'], row['actual_shipping'], row['country'], row['date_add'], row['date_upd']
                )
                warehouse.execute(merge_operation, insert_values)
        print(f'The {table} table has been successfully created in BigQuery and new records have been added')

    except Exception as e:
        print(f'Error creating and updating records in BigQuery: {e}')

@flow(retries=3, retry_delay_seconds=5, log_prints=True)
def orders_pipeline():
  
    data_from_api = fetch_new_data(API_URL, headers)
    data_frame = get_api_data(data_from_api)
    print(f"\nThere are {data_frame.shape[0]} rows and {data_frame.shape[1]} columns in the Dataframe.")
    prepared_data_frame = prepare_data_for_bigquery(data_frame)
    load_to_bigquery_from_data_frame(schema, table, prepared_data_frame, bigquery_warehouse_block)

if __name__ == "__main__":
    orders_pipeline()
    # serve(
    #     name="mvm_orders_listing",
    #     cron="@daily",
    #     tags=["mvm"],
    #     description="MVM orders listing for Allster online sales.",
    #     version="mvm/deployments",
    # )