hey guys, hopefully this is an easy / noob level resolution that's required:
I want to get the count of records returned in a dataframe from a task to pass through to another task like so:
with Flow('test_flow',executor=LocalExecutor()) as flow:
df = sometask_that_returns_a_df()
count_of_records_in_df = len(df.index)
execute_task_2(count_of_records_in_df)
Obvs compilation fails because the return type of sometask is a functiontask
a
Anna Geller
10/20/2021, 8:50 AM
Hi @Adam Everington, you would have to wrap it into a task, then you can use it within the
with Flow()
constructor:
Copy code
@task
def get_len_of_df(df):
return len(df.index)
with Flow('test_flow',executor=LocalExecutor()) as flow:
df = sometask_that_returns_a_df()
count_of_records_in_df = get_len_of_df(df)
execute_task_2(count_of_records_in_df)
a
Adam Everington
10/20/2021, 9:10 AM
awesome, thanks @Anna Geller!
Adam Everington
10/20/2021, 9:11 AM
Seems obvious when you say it.... duh!
Adam Everington
10/20/2021, 9:18 AM
If i created a class like so:
class DTO:
def __init__(self,data:pd.DataFrame):
self.data=data
self.data_count=len(self.data.index)
had this as a return type of a task:
@task
def some_task->DTO:
thisDTO = DTO(data_frame)
return thisDTO
could i then access that within the flow like so:
with Flow('test_flow',executor=LocalExecutor()) as flow:
dto1 = some_task()
task2(dto1.data_count)
would I face the same issues there?
a
Anna Geller
10/20/2021, 9:24 AM
It depends on many factors - what type of storage do you use, whether you use Dask. What if your class DTO would also be a Prefect task? Here is a docs page showing how to subclass the Task class
Copy code
class DTO(Task):
def run(self, data, data_count):
# do sth
return data
🙌 1
k
Kevin Kho
10/20/2021, 1:35 PM
You can access DTO as long as it is serializeable with
Bring your towel and join one of the fastest growing data communities. Welcome to our second-generation open source orchestration platform, a completely rethought approach to dataflow automation.