John Ramirez
03/09/2020, 4:37 PMLaura Lorenz (she/her)
03/09/2020, 5:12 PMswitch
hereJohn Ramirez
03/09/2020, 5:34 PMLaura Lorenz (she/her)
03/09/2020, 6:55 PM{'a': df1, 'b': df2}
, and then the downstream tasks index that result like task1(result['a']
and task2(result['b']
? Basically doing the splitting itself in the upstream taskJohn Ramirez
03/11/2020, 7:29 PMLaura Lorenz (she/her)
03/11/2020, 10:35 PM@task
def dataframe_stuff() -> dict:
df = pd.DataFrame([['a', 1], ['b', 2]], columns=['group','data'])
# add/do other stuff to your dataframe
groups = df.groupby('group')
return {k:v for (k,v) in groups}
@task
task1(a: 'pandas.DataFrame' -> 'pandas.DataFrame'):
# do stuff special to a
return a
@task
task2(b: 'pandas.DataFrame') -> 'pandas.DataFrame':
# do stuff special to b
return b
with Flow("my flow") as flow:
split_dataframes = dataframe_stuff()
result_on_a = task1(split_dataframes['a'])
result_on_b = task2(split_dataframe['b'])
Alternatively, if task1
and task2
do the same thing so calling either is the same, you can probably use mapping instead and return a list of dataframes from dataframe_stuff()
instead of a dictionary.
Either way you’ll still need to return some structure from the upstream task that is already split up however it is that you want the dataframe to be split up by for its downstream tasks.