Hello everyone, in prefect, what's the idiom to flatten back the results of multiple parallel tasks from apply_map()? I've tried various combinations of syntax and unfortunately I can't seem to get it to work.
k
Kevin Kho
12/01/2021, 11:24 PM
Do you need flatten specifically or just a reduce? Is it a nested list that needs to be flattened?
w
Wilhelm Su
12/01/2021, 11:32 PM
My pseudocode is something like this :
Copy code
list_of_files = get_files()
for file in list_of_files:
extract()
<some tasks>
validate()
if validate == fail:
<some tasks>
if validate == success
<some tasks>
results = merge()
return result
combined = combine_results()
something(combined)
<some tasks>
I think you can just pass it to a new task that takes in a list like this:
Copy code
from prefect import Flow, task, case, apply_map
from prefect.tasks.control_flow import merge
import prefect
@task
def inc(x):
return x + 1
@task
def negate(x):
return -x
@task
def is_even(x):
return x % 2 == 0
def inc_or_negate(x):
cond = is_even(x)
# If x is even, increment it
with case(cond, True):
res1 = inc(x)
# If x is odd, negate it
with case(cond, False):
res2 = negate(x)
return merge(res1, res2)
@task
def log_result(x):
<http://prefect.context.logger.info|prefect.context.logger.info>(x)
return
with Flow("apply-map example") as flow:
result = apply_map(inc_or_negate, range(4))
log_result(result)
flow.run()
w
Wilhelm Su
12/01/2021, 11:34 PM
ill try this, thanks so much!
Wilhelm Su
12/01/2021, 11:37 PM
ughhhhh @Kevin Kho thanks yeah really was that simple, I was playing around with various combinations of reapplying, merging, and mapping hahaha
Bring your towel and join one of the fastest growing data communities. Welcome to our second-generation open source orchestration platform, a completely rethought approach to dataflow automation.