Roy
08/10/2020, 11:00 AMmerge
two task results together in a single one, the merge is supposed to return the first "real" result. What is considered a "real" result? It seems that only None
is considered a non-real result. I would have expected that NoResult
results would also be skipped, but this is not the case. Is this intended? Example in thread.import prefect
@prefect.task
def task_skip():
# Create a situation where a `NoResult` is returned
raise prefect.engine.signals.SKIP()
@prefect.task
def task_5():
return 5
with prefect.Flow(name="flow") as flow:
ts = task_skip()
t5 = task_5()
res = prefect.tasks.control_flow.merge(ts, t5)
state = flow.run()
print(state.result[res].result)
itay livni
08/10/2020, 1:10 PMmerge
. It is traditionally used after an ifelse
or switch
where the first param is the result of the true branch and the second param is a result of the false branch.Roy
08/10/2020, 1:57 PMskipped
will not be considered in the merge, even though it may have been run. The example above is a simplification of what I am trying to do. I have two branches, but one of them may manually raise a SKIP
signal that contains a result
. In that case, both merge
tasks are skipped and the result is None
. Updated example:
from prefect import Flow, task
from prefect.engine import signals
from prefect.tasks.control_flow import case, merge
@task
def condition():
return True
@task
def get_value():
if file_exists("..."):
value = load_file("...")
raise signals.SKIP("Nothing to do here.", result=value)
return compute_result("...")
@task
def false_task():
return 5
with Flow(name="flow") as flow:
cond = condition()
with case(cond, True):
res_t = get_value()
with case(cond, False):
res_f = false_task()
res = merge(res_t, res_f)
state = flow.run()
print(state.result[res].result)
I expected this to return value
, but the merge
ignores this skipped
task's result
.