hey guys, hopefully this is an easy / noob level r...
# ask-community
a
hey guys, hopefully this is an easy / noob level resolution that's required: I want to get the count of records returned in a dataframe from a task to pass through to another task like so:
with Flow('test_flow',executor=LocalExecutor()) as flow:
df = sometask_that_returns_a_df()
count_of_records_in_df = len(df.index)
execute_task_2(count_of_records_in_df)
Obvs compilation fails because the return type of sometask is a functiontask
a
Hi @Adam Everington, you would have to wrap it into a task, then you can use it within the
with Flow()
constructor:
Copy code
@task
def get_len_of_df(df):
    return len(df.index) 

with Flow('test_flow',executor=LocalExecutor()) as flow:
    df = sometask_that_returns_a_df()
    count_of_records_in_df = get_len_of_df(df)
    execute_task_2(count_of_records_in_df)
a
awesome, thanks @Anna Geller!
Seems obvious when you say it.... duh!
If i created a class like so:
class DTO:
def __init__(self,data:pd.DataFrame):
self.data=data
self.data_count=len(self.data.index)
had this as a return type of a task:
@task
def some_task->DTO:
thisDTO = DTO(data_frame)
return thisDTO
could i then access that within the flow like so:
with Flow('test_flow',executor=LocalExecutor()) as flow:
dto1 = some_task()
task2(dto1.data_count)
would I face the same issues there?
a
It depends on many factors - what type of storage do you use, whether you use Dask. What if your class DTO would also be a Prefect task? Here is a docs page showing how to subclass the Task class
Copy code
class DTO(Task):
    def run(self, data, data_count):
        # do sth
        return data
🙌 1
k
You can access DTO as long as it is serializeable with
cloudpickle
, which I think it should be.
🙌 1