Sven Teresniak
07/23/2020, 7:59 AMdef complex_task_generating_function(singleelement):
case(sometask, foo):
anothertask(…)
…
with Flow("foo") as flow:
param = Parameter("param", required=False)
# generates a list of strings, based on param. len is 0…n
elements_to_process = maybe_generate_work_items(param)
# when this evaluates to False, all the following is skipped, the apply_map as well!
case(isempty_task(elements_to_process), True):
# now I either want to add one default element or
# somehow do the processing based on the following result
generated_default = default_value_generator_task()
# maybe so?
elements_to_process = task(lambda x: [x])(generated_default)
# now the tricky part.
# elements_to_process is either a list or just one (runtime dependent) default value
result = apply_map(complex_task_generating_function, elements_to_process)
Problem is: apply_map
does not know skip_on_upstream_skip
. I cannot just use map()
because complex_task_generating_function
is not a task (its the beef of the flow so to say and in fact the logic of the flow). I found a workaround by doing something like this:
@task(name="hack", skip_on_upstream_skip=False)
def merger_hack(elements, default):
return elements or [default]
with Flow("foo") as flow:
param = Parameter("param", required=False) # same as above
elements_to_process = maybe_generate_work_items(param) # same as above
case(isempty_task(elements_to_process), True):
generated_default = default_value_generator_task()
final_list = merger_hack(elements_to_process, generated_default)
result = apply_map(complex_task_generating_function, final_list)
But to write code like the hack-task that basically checks if the flow ran through the isempty-case or not seems odd. I don't want to "check" whether or not the flow used one path or another. The run path through the flow should decide this. How can I write this elegant and easy?
Sorry for the long question but I want to learn how to use Prefect properly because in the future I'm going to write a lot of flows.