Hello! I have a question about creating complex pr...
# prefect-community
m
Hello! I have a question about creating complex prefect flows. Is there a prefect native way to achieve a nested tree-like process, where multiple tasks are spawned from single tasks and some of them are later merged again? IDK how to best describe it and I'm not sure how detailed my description should be on this channel, but perhaps this sketching helps
g
Sounds like a use case for mapping over tasks: https://docs.prefect.io/core/concepts/mapping.html#mapping
m
Hi @Greg Roche, thanks for the reply and interest. Mapping definitely is to be used, but I believe I am in need of using the mapping in a nested approach, bu as far as I tried, this wasn't working
g
I guess it depends a lot on the specifics of your use case, but your mention of "nested approach" sounds to me like you already have something kinda-working in prefect, maybe one big task which handles a bunch of steps, and you're then trying to implement mapping within this one task. It might be a better approach to break the process down into smaller, more atomic tasks, each of which do just one thing each, and then chaining these up with a bunch of mapping and reducing to do what you want.
m
That makes sense and I generally like the approach, but im not sure how to reduce only specific outputs together, because I wouldn't want the process to wait to finish all of them before it defines a new way to map it further. Any ideas? So, some pseudo code for the above:
Copy code
def get_tiles():
    return [['T1_1','T1_2','T1_3'], ['T2_1','T2_2','T2_3'], ...]

def process_tile(tile):
    # do stuff to `tile`
    # create further tasks from single tile
    return [(tile, p1), (tile, p2), ...]

def merge_same_param_over_tiles(specific_tiles, params):
    # merge [(T1_1, p1), (T1_2, p1), (T1_3, p1)]
    return merged_output

with Flow("flow name") as flow:
   nested_list = get_tiles()  # output is nested list
   processed_tiles_with_params = process_tile(flatten(nested_list))  # input is flattened list, output is [(T1_1, p1), (T1_1, p1), ... (T1_1, p2), ... (T2_1, p1), ... (T2_2, p2), ...]
   
   ...

   # how to now reduce only specific part of `all_processed_tiles` and map over these subgroups?
   # i.e. reduce [(T1_1, p1), (T1_2, p1), ..., (T1_N, p1)]
   #      reduce [(T1_1, p2), (T1_2, p2), ..., (T1_N, p2)]
   #      ...
   #      reduce [(T2_1, pM), (T2_2, pM), ..., (T2_N, pM)]