Christopher Sastropranoto
07/19/2023, 1:18 AMfrom database import *
from scrapers.thai_scraper import *
import pandas as pd
from prefect import flow, task
from prefect.deployments import Deployment
RETRY_DELAY = 10 # 30 MIN (REPORTED IN SECONDS)
RETRIES = 5
@task
def extract_forward(Outlet) -> pd.DataFrame:
"""Do one forward iteration of the extract process. Note: Outlet is a class name.
Returns a dataframe containing a populated database.
"""
outlet = Outlet()
outlet.collect_data()
dataset = outlet.return_dataset()
return dataset
@flow(name = "extract", retries = RETRIES, retry_delay_seconds = RETRY_DELAY)
def thai_extract() -> pd.DataFrame:
"""Perform the extraction process and return a dataframe for Thailand."""
dataset = TemporaryDatabase({}, 'Thailand')
# ThaiPBSWorld Data
thaipbs = extract_forward(ThaiPBS)
if not thaipbs.empty:
dataset.add_source('ThaiPBSWorld', thaipbs)
dataset.validate_data(today_date = False)
dataset.compile()
return dataset.fetch_compiled()
@task(name = "transform")
def transform(dataset) -> pd.DataFrame:
"""Take in a Pandas dataset and perform the transformation process and return two dataframes."""
news_dataset = dataset.drop(columns = ['outlet', 'country_source', 'url'])
source_dataset = dataset.drop(columns = ['author', 'date', 'tags', 'country'])
return {'News': news_dataset, 'Source': source_dataset}
@task(name = "load")
def load(dataset: dict) -> None:
"""Load the dataset into a csv (Add database later!).
Note, that the dataset here is the format is like the one in transform method."""
dataset['News'].to_csv('news.csv', index = False)
dataset['Source'].to_csv('news_sources.csv', index = False)
return dataset
@flow(log_prints = True)
def complete_pipeline():
"""The complete ETL pipeline."""
dataset = thai_extract()
dataset = transform(dataset)
dataset = load(dataset)
if __name__ == "__main__":
complete_pipeline()
Deceivious
07/19/2023, 9:29 AMChristopher Sastropranoto
07/19/2023, 3:06 PMDeceivious
07/19/2023, 6:59 PMChristopher Sastropranoto
07/19/2023, 7:03 PMDeceivious
07/19/2023, 7:04 PMChristopher Sastropranoto
07/19/2023, 7:06 PMDeceivious
07/19/2023, 7:07 PMChristopher Sastropranoto
07/19/2023, 7:14 PMDeceivious
07/19/2023, 7:14 PMChristopher Sastropranoto
07/19/2023, 7:15 PM@task(name = "load")
def load(dataset: dict) -> None:
"""Load the dataset into a csv (Add database later!).
Note, that the dataset here is the format is like the one in transform method."""
get_run_logger().info(os.getcwd())
for item in dataset:
dataset[item].to_csv(f'{item}.csv', index = False)
return dataset
Deceivious
07/19/2023, 7:23 PMChristopher Sastropranoto
07/19/2023, 7:30 PM