https://prefect.io logo
Title
r

Robert Esteves

03/13/2023, 3:57 PM
Hi everyone, How do I fix my deployment error to be able to trigger the flow from my local Prefect UI? Here is my code:
#!/usr/bin/env python3

"""
Author: Robert Esteves
Date: 2023-03-12
Purpose: This script is designed to ingest a csv file and return a json object
"""

import csv
import json
from pathlib import Path
from prefect import flow, task
from prefect.deployments import Deployment

@task(name="Get Source Data From Disk Drive")
def get_data(source_file: str) -> json:
    """
    get_data
    Retrieves the data from flat file and stores the result in a json object.

    :param source_file:
    :return:
    """

    raw_data = []

    with open(source_file, 'r') as src_file:
        csv_reader = csv.DictReader(src_file, delimiter=',')
        for row in csv_reader:
            raw_data.append(row)

    return raw_data


@task(name="Store the JSON object in the Disk Drive")
def store_results(source_lst: list, tgt_file_name: str, tgt_file_path: str = 'C:/sandbox_target_folder') -> None:
    """
    store_data
    This function stores a json object in the disk drive.

    :param source_lst:
    :param tgt_file_name:
    :param tgt_file_path:
    :return:
    """

    p = Path(tgt_file_path)
    tgt_file = f"{p.as_posix()}/{tgt_file_name}"

    results = json.dumps(source_lst, indent=4)

    with open(tgt_file, 'w') as tgt:
        tgt.write(results)


# A Flow is a function that contains several tasks
@flow(name="Pipeline Run Demo")
def process_data():

    """
    process_data
    This function executes the two functions that will get the data from the disk drive
    and convert the results to json format.
    :return:

    """

    data = get_data('C:/sandbox_source_folder/sales.csv')
    store_results(data, 'sales.json')


if __name__ == "__main__":

    deploy_flow = Deployment.build_from_flow(
        flow=process_data,
        name="Prefect Tutorial 3 With Deployment",
        version=1,
        work_queue_name="Robert 1"
    )

    deploy_flow.apply()

    # process_data()
image.png
image.png