https://prefect.io logo
#prefect-server
Title
# prefect-server
m

madhav

03/11/2022, 10:17 PM
Hi all, I have a task that returns a dataframe, but instead it's returning a FunctionTask. Could it be something with a task definition?
k

Kevin Kho

03/11/2022, 10:22 PM
That sounds like it was used in the flow incorrectly. Could you show me the code?
m

madhav

03/11/2022, 10:26 PM
Copy code
import datetime
from prefect import Flow
from prefect.engine.state import Success
from sqlalchemy import select

from data_models.orm import Workflow

from models.DatabaseConnection import DatabaseConnection
from models.SilverFileModel import SilverFileModel
from models.RuntimeParameters import RuntimeParameters

from tasks.local_csv_to_dataframe import local_csv_to_dataframe
from tasks.silver_file_to_dataframe import silver_file_to_dataframe
from tasks.validate_elec_ami_dataframe_syntax import validate_elec_ami_dataframe_syntax
from tasks.validate_elec_ami_dataframe_contents import (
    validate_elec_ami_dataframe_contents,
)
from tasks.create_elec_ami_meter_records import create_elec_ami_meter_records

# set up the database session
db_conn = DatabaseConnection()
session = db_conn.session

# define the workflow name and process namegit pull
flow_name = "flow-s2g-elec-ami-csv-to-meter-records"
processor_name = "prefect-orchestrator"

# get the set of SilverFiles to process
model = SilverFileModel(session, "ELECTRICITY-AMI")
silver_files = model.get_unprocessed()
workflow = Workflow()
workflow.flow_name = flow_name
workflow.created_by = processor_name
workflow.updated_by = processor_name
session.add(workflow)
session.commit()

with Flow(flow_name) as flow:
    # run the tasks
    dataframe = silver_file_to_dataframe(workflow.id, silver_files[0])
    print("---------------**" + str(type(dataframe)))


flow.run()
reading through it, could the problem be the second argument to the task? i.e. that it is an non-serializable object?
k

Kevin Kho

03/11/2022, 10:36 PM
This is because the print is happening during Flow creation time, not during the runtime. It would be more accurate if you logged the type inside a task to defer execution to runtime also
upvote 1
m

madhav

03/11/2022, 10:52 PM
got it, thank you. that makes sense. we updated the code and now it works!
k

Kevin Kho

03/11/2022, 10:57 PM
Nice!
3 Views