madhav
03/11/2022, 10:17 PMKevin Kho
03/11/2022, 10:22 PMmadhav
03/11/2022, 10:26 PMimport datetime
from prefect import Flow
from prefect.engine.state import Success
from sqlalchemy import select
from data_models.orm import Workflow
from models.DatabaseConnection import DatabaseConnection
from models.SilverFileModel import SilverFileModel
from models.RuntimeParameters import RuntimeParameters
from tasks.local_csv_to_dataframe import local_csv_to_dataframe
from tasks.silver_file_to_dataframe import silver_file_to_dataframe
from tasks.validate_elec_ami_dataframe_syntax import validate_elec_ami_dataframe_syntax
from tasks.validate_elec_ami_dataframe_contents import (
validate_elec_ami_dataframe_contents,
)
from tasks.create_elec_ami_meter_records import create_elec_ami_meter_records
# set up the database session
db_conn = DatabaseConnection()
session = db_conn.session
# define the workflow name and process namegit pull
flow_name = "flow-s2g-elec-ami-csv-to-meter-records"
processor_name = "prefect-orchestrator"
# get the set of SilverFiles to process
model = SilverFileModel(session, "ELECTRICITY-AMI")
silver_files = model.get_unprocessed()
workflow = Workflow()
workflow.flow_name = flow_name
workflow.created_by = processor_name
workflow.updated_by = processor_name
session.add(workflow)
session.commit()
with Flow(flow_name) as flow:
# run the tasks
dataframe = silver_file_to_dataframe(workflow.id, silver_files[0])
print("---------------**" + str(type(dataframe)))
flow.run()
Kevin Kho
03/11/2022, 10:36 PMmadhav
03/11/2022, 10:52 PMKevin Kho
03/11/2022, 10:57 PM