Sven Teresniak07/23/2020, 7:59 AM
def complex_task_generating_function(singleelement): case(sometask, foo): anothertask(…) … with Flow("foo") as flow: param = Parameter("param", required=False) # generates a list of strings, based on param. len is 0…n elements_to_process = maybe_generate_work_items(param) # when this evaluates to False, all the following is skipped, the apply_map as well! case(isempty_task(elements_to_process), True): # now I either want to add one default element or # somehow do the processing based on the following result generated_default = default_value_generator_task() # maybe so? elements_to_process = task(lambda x: [x])(generated_default) # now the tricky part. # elements_to_process is either a list or just one (runtime dependent) default value result = apply_map(complex_task_generating_function, elements_to_process)
does not know
. I cannot just use
is not a task (its the beef of the flow so to say and in fact the logic of the flow). I found a workaround by doing something like this:
But to write code like the hack-task that basically checks if the flow ran through the isempty-case or not seems odd. I don't want to "check" whether or not the flow used one path or another. The run path through the flow should decide this. How can I write this elegant and easy? Sorry for the long question but I want to learn how to use Prefect properly because in the future I'm going to write a lot of flows.
@task(name="hack", skip_on_upstream_skip=False) def merger_hack(elements, default): return elements or [default] with Flow("foo") as flow: param = Parameter("param", required=False) # same as above elements_to_process = maybe_generate_work_items(param) # same as above case(isempty_task(elements_to_process), True): generated_default = default_value_generator_task() final_list = merger_hack(elements_to_process, generated_default) result = apply_map(complex_task_generating_function, final_list)