Hello everyone, in prefect, what's the idiom to fl...
# ask-community
w
Hello everyone, in prefect, what's the idiom to flatten back the results of multiple parallel tasks from apply_map()? I've tried various combinations of syntax and unfortunately I can't seem to get it to work.
k
Do you need flatten specifically or just a reduce? Is it a nested list that needs to be flattened?
w
My pseudocode is something like this :
Copy code
list_of_files = get_files()

for file in list_of_files:
	extract()
	<some tasks>
	validate()
	if validate == fail:
		<some tasks>
	if validate == success
		<some tasks>
	results = merge()
    return result
 
combined = combine_results()
something(combined)
<some tasks>
I thought that this classifies as a 'complex mapped pipeline' as per https://docs.prefect.io/core/concepts/mapping.html#unmapped-inputs, so I create a task factory using apply_map(). I don;t know how to re-combined all the outputs of the parallel tasks though
k
I think you can just pass it to a new task that takes in a list like this:
Copy code
from prefect import Flow, task, case, apply_map
from prefect.tasks.control_flow import merge
import prefect

@task
def inc(x):
    return x + 1

@task
def negate(x):
    return -x

@task
def is_even(x):
    return x % 2 == 0

def inc_or_negate(x):
    cond = is_even(x)
    # If x is even, increment it
    with case(cond, True):
        res1 = inc(x)
    # If x is odd, negate it
    with case(cond, False):
        res2 = negate(x)
    return merge(res1, res2)

@task
def log_result(x):
   <http://prefect.context.logger.info|prefect.context.logger.info>(x)
   return 

with Flow("apply-map example") as flow:
    result = apply_map(inc_or_negate, range(4))
    log_result(result)

flow.run()
w
ill try this, thanks so much!
ughhhhh @Kevin Kho thanks yeah really was that simple, I was playing around with various combinations of reapplying, merging, and mapping hahaha
thanks so much
k
No problem!