Hi all, I have a task that returns a dataframe, bu...
# prefect-server
Hi all, I have a task that returns a dataframe, but instead it's returning a FunctionTask. Could it be something with a task definition?
That sounds like it was used in the flow incorrectly. Could you show me the code?
Copy code
import datetime
from prefect import Flow
from prefect.engine.state import Success
from sqlalchemy import select

from data_models.orm import Workflow

from models.DatabaseConnection import DatabaseConnection
from models.SilverFileModel import SilverFileModel
from models.RuntimeParameters import RuntimeParameters

from tasks.local_csv_to_dataframe import local_csv_to_dataframe
from tasks.silver_file_to_dataframe import silver_file_to_dataframe
from tasks.validate_elec_ami_dataframe_syntax import validate_elec_ami_dataframe_syntax
from tasks.validate_elec_ami_dataframe_contents import (
from tasks.create_elec_ami_meter_records import create_elec_ami_meter_records

# set up the database session
db_conn = DatabaseConnection()
session = db_conn.session

# define the workflow name and process namegit pull
flow_name = "flow-s2g-elec-ami-csv-to-meter-records"
processor_name = "prefect-orchestrator"

# get the set of SilverFiles to process
model = SilverFileModel(session, "ELECTRICITY-AMI")
silver_files = model.get_unprocessed()
workflow = Workflow()
workflow.flow_name = flow_name
workflow.created_by = processor_name
workflow.updated_by = processor_name

with Flow(flow_name) as flow:
    # run the tasks
    dataframe = silver_file_to_dataframe(workflow.id, silver_files[0])
    print("---------------**" + str(type(dataframe)))

reading through it, could the problem be the second argument to the task? i.e. that it is an non-serializable object?
This is because the print is happening during Flow creation time, not during the runtime. It would be more accurate if you logged the type inside a task to defer execution to runtime also
upvote 1
got it, thank you. that makes sense. we updated the code and now it works!