Wilhelm Su
12/01/2021, 11:17 PMKevin Kho
Wilhelm Su
12/01/2021, 11:32 PMlist_of_files = get_files()
for file in list_of_files:
extract()
<some tasks>
validate()
if validate == fail:
<some tasks>
if validate == success
<some tasks>
results = merge()
return result
combined = combine_results()
something(combined)
<some tasks>
I thought that this classifies as a 'complex mapped pipeline' as per https://docs.prefect.io/core/concepts/mapping.html#unmapped-inputs, so I create a task factory using apply_map(). I don;t know how to re-combined all the outputs of the parallel tasks thoughKevin Kho
from prefect import Flow, task, case, apply_map
from prefect.tasks.control_flow import merge
import prefect
@task
def inc(x):
return x + 1
@task
def negate(x):
return -x
@task
def is_even(x):
return x % 2 == 0
def inc_or_negate(x):
cond = is_even(x)
# If x is even, increment it
with case(cond, True):
res1 = inc(x)
# If x is odd, negate it
with case(cond, False):
res2 = negate(x)
return merge(res1, res2)
@task
def log_result(x):
<http://prefect.context.logger.info|prefect.context.logger.info>(x)
return
with Flow("apply-map example") as flow:
result = apply_map(inc_or_negate, range(4))
log_result(result)
flow.run()
Wilhelm Su
12/01/2021, 11:34 PMWilhelm Su
12/01/2021, 11:37 PMWilhelm Su
12/01/2021, 11:37 PMKevin Kho