https://prefect.io logo
Title
j

John Ramirez

03/09/2020, 4:37 PM
Hey everyone - how do i perform a conditional split on a single input
l

Laura Lorenz (she/her)

03/09/2020, 5:12 PM
Hi! What do you mean by conditional “split” — as in branching task trees based on the value of an input? Does https://docs.prefect.io/api/latest/tasks/control_flow.html#functions do what you want?
specifically trying to link to
switch
here
j

John Ramirez

03/09/2020, 5:34 PM
I have a pandas df with Group A and Group B. I need to pass group A to task 1 and group B to task 2
can the switch task work with a list
l

Laura Lorenz (she/her)

03/09/2020, 6:55 PM
Is it possible to implement your situation such that the dataframe is returned from the upstream task already split up into two, like
{'a': df1, 'b': df2}
, and then the downstream tasks index that result like
task1(result['a']
and
task2(result['b']
? Basically doing the splitting itself in the upstream task
the switch task operates on a dictionary, but also it is for only doing one task tree of a set of mutually exclusive task trees, but it sounds like you want to run the task tree for group A AND group B, not just one or the other
j

John Ramirez

03/11/2020, 7:29 PM
Can you provide a full code sample
l

Laura Lorenz (she/her)

03/11/2020, 10:35 PM
This is more psuedocode but this is the idea; where you split it yourself in an upstream task based on whatever rules you want to split it by:
@task
def dataframe_stuff() -> dict:
    df = pd.DataFrame([['a', 1], ['b', 2]], columns=['group','data'])
    # add/do other stuff to your dataframe
    groups = df.groupby('group')
    return {k:v for (k,v) in groups}

@task 
task1(a: 'pandas.DataFrame' -> 'pandas.DataFrame'):
	# do stuff special to a
    return a

@task 
task2(b: 'pandas.DataFrame') -> 'pandas.DataFrame':
	# do stuff special to b
    return b

with Flow("my flow") as flow:
    split_dataframes = dataframe_stuff()
    result_on_a = task1(split_dataframes['a'])
    result_on_b = task2(split_dataframe['b'])
Alternatively, if
task1
and
task2
do the same thing so calling either is the same, you can probably use mapping instead and return a list of dataframes from
dataframe_stuff()
instead of a dictionary. Either way you’ll still need to return some structure from the upstream task that is already split up however it is that you want the dataframe to be split up by for its downstream tasks.
oo and looks like @Zachary Hughes actually got you in another thread. Thanks zachary!