Robert Esteves
03/13/2023, 3:57 PM#!/usr/bin/env python3
"""
Author: Robert Esteves
Date: 2023-03-12
Purpose: This script is designed to ingest a csv file and return a json object
"""
import csv
import json
from pathlib import Path
from prefect import flow, task
from prefect.deployments import Deployment
@task(name="Get Source Data From Disk Drive")
def get_data(source_file: str) -> json:
"""
get_data
Retrieves the data from flat file and stores the result in a json object.
:param source_file:
:return:
"""
raw_data = []
with open(source_file, 'r') as src_file:
csv_reader = csv.DictReader(src_file, delimiter=',')
for row in csv_reader:
raw_data.append(row)
return raw_data
@task(name="Store the JSON object in the Disk Drive")
def store_results(source_lst: list, tgt_file_name: str, tgt_file_path: str = 'C:/sandbox_target_folder') -> None:
"""
store_data
This function stores a json object in the disk drive.
:param source_lst:
:param tgt_file_name:
:param tgt_file_path:
:return:
"""
p = Path(tgt_file_path)
tgt_file = f"{p.as_posix()}/{tgt_file_name}"
results = json.dumps(source_lst, indent=4)
with open(tgt_file, 'w') as tgt:
tgt.write(results)
# A Flow is a function that contains several tasks
@flow(name="Pipeline Run Demo")
def process_data():
"""
process_data
This function executes the two functions that will get the data from the disk drive
and convert the results to json format.
:return:
"""
data = get_data('C:/sandbox_source_folder/sales.csv')
store_results(data, 'sales.json')
if __name__ == "__main__":
deploy_flow = Deployment.build_from_flow(
flow=process_data,
name="Prefect Tutorial 3 With Deployment",
version=1,
work_queue_name="Robert 1"
)
deploy_flow.apply()
# process_data()