https://prefect.io logo
c

Christopher Sastropranoto

07/19/2023, 1:18 AM
Hi everyone, I'm new to Prefect so please bear with me. I am trying to deploy a model I made and I cannot seem to save my results to a CSV file (locally) even though Prefect Cloud says that all flows ran perfectly. Could you take a look at my code to see what is wrong? I suspect it may have something to do with the load function because the data that I want is stored in the parameters tab on Prefect Cloud (however, I want this data stored as a csv file locally). Thank you! [SOLVED: The solution was to specify the full file path instead of using the relative file path]
Copy code
from database import *
from scrapers.thai_scraper import *
import pandas as pd
from prefect import flow, task
from prefect.deployments import Deployment

RETRY_DELAY = 10 # 30 MIN (REPORTED IN SECONDS)
RETRIES = 5

@task
def extract_forward(Outlet) -> pd.DataFrame:
    """Do one forward iteration of the extract process. Note: Outlet is a class name.
    Returns a dataframe containing a populated database.
    """
    
    outlet = Outlet()
    outlet.collect_data()
    dataset = outlet.return_dataset()
    
    return dataset
    
@flow(name = "extract", retries = RETRIES, retry_delay_seconds = RETRY_DELAY)
def thai_extract() -> pd.DataFrame:
    """Perform the extraction process and return a dataframe for Thailand."""
    
    dataset = TemporaryDatabase({}, 'Thailand')
    
    # ThaiPBSWorld Data
    thaipbs = extract_forward(ThaiPBS)
    if not thaipbs.empty:
        dataset.add_source('ThaiPBSWorld', thaipbs)
    dataset.validate_data(today_date = False)
    dataset.compile()
    
    return dataset.fetch_compiled()

@task(name = "transform")
def transform(dataset) -> pd.DataFrame:
    """Take in a Pandas dataset and perform the transformation process and return two dataframes."""
    
    news_dataset = dataset.drop(columns = ['outlet', 'country_source', 'url'])
    source_dataset = dataset.drop(columns = ['author', 'date', 'tags', 'country'])

    return {'News': news_dataset, 'Source': source_dataset}

@task(name = "load")
def load(dataset: dict) -> None:
    """Load the dataset into a csv (Add database later!). 
    
    Note, that the dataset here is the format is like the one in transform method."""
    
    dataset['News'].to_csv('news.csv', index = False) 
    dataset['Source'].to_csv('news_sources.csv', index = False)
    
    return dataset

@flow(log_prints = True)
def complete_pipeline():
    """The complete ETL pipeline."""
    
    dataset = thai_extract()
    dataset = transform(dataset)
    dataset = load(dataset)

if __name__ == "__main__":
    complete_pipeline()
d

Deceivious

07/19/2023, 9:29 AM
Missing information. How is the deployment done? Where are the agents running? If its docker - the file is inside the container.
c

Christopher Sastropranoto

07/19/2023, 3:06 PM
The deployment was done using a virtual environment. The agent was also running on a virtual environment. I'm new to data engineering so thank you for your patience.
d

Deceivious

07/19/2023, 6:59 PM
Unsure.Maybe try log the working directory. If the flows complete the fileshould be somewhere.
c

Christopher Sastropranoto

07/19/2023, 7:03 PM
it's weird because when I run it on my IDE, it successfully creates the csv files I want. It's just that when I deploy, then there's a problem
as for the logs, everything seems to run successfully
Screenshot 2023-07-19 at 3.03.54 PM.png
d

Deceivious

07/19/2023, 7:04 PM
Yes on the load function log the current working directory.
and deploy and run the code to see where the files are actually being saved.
c

Christopher Sastropranoto

07/19/2023, 7:06 PM
This is what i'm getting when I look at the log for the load function.
d

Deceivious

07/19/2023, 7:07 PM
prefect.get_run_logger().info(os.getcwd())
c

Christopher Sastropranoto

07/19/2023, 7:14 PM
Here is the logs when I run the file, however, the csv files still aren't created
sorry for being slow 🙏
d

Deceivious

07/19/2023, 7:14 PM
Is the folder there tho?
c

Christopher Sastropranoto

07/19/2023, 7:15 PM
yep, I can access the folder
the folder shown above seems to be a direct copy of my working directoy
Just in case, here is how I modified my original load function:
Copy code
@task(name = "load")
def load(dataset: dict) -> None:
    """Load the dataset into a csv (Add database later!). 
    
    Note, that the dataset here is the format is like the one in transform method."""
    
    get_run_logger().info(os.getcwd())
    for item in dataset:
        dataset[item].to_csv(f'{item}.csv', index = False)
    
    return dataset
d

Deceivious

07/19/2023, 7:23 PM
Out of ideas. Maybe try specify the full path instead of relative?
Unless the dataset is empty 😄
c

Christopher Sastropranoto

07/19/2023, 7:30 PM
Yes! The full file path worked ! Thank you very much for this! You're the best 😁