Joao Erik Melo
02/05/2021, 8:27 PMwith Flow("conditional-branches") as flow:
cond1 = check_condition1()
with case(cond1, True):
val1 = task1()
cond2 = check_condition2()
with case(cond2, True):
val2 = task2()
cond3 = check_condition3()
with case(cond3, True):
val3 = task3()
flow.set_dependencies(
task=val1,
downstream_tasks=[val2])
flow.set_dependencies(
task=val2,
downstream_tasks=[val3])
nicholas
02/05/2021, 8:51 PMwith Flow("conditional-branches") as flow:
cond1 = check_condition1()
with case(cond1, True):
val1 = task1()
cond2 = check_condition2()
with case(cond2, True):
val2 = task2(upstream_tasks=[val1])
cond3 = check_condition3()
with case(cond3, True):
val3 = task3(upstream_tasks=[val1, val2])
combined with a not_all_skipped
trigger on task2 and task3Joao Erik Melo
02/08/2021, 2:04 PMskip_on_upstream_skip = False
), than even Task2 is NOT skipped as should be.
And if I do the same experiment, but setting skip_on_upstream_skip = True
, all downstream tasks after Task2 are SKIPPED too (image2), not only Task2 as I would like.
with Flow("conditional-branches") as flow:
cond1_param = Parameter(name="cond1_param")
cond2_param = Parameter(name="cond2_param")
cond3_param = Parameter(name="cond3_param")
cond4_param = Parameter(name="cond4_param")
cond1 = check_condition1(cond1_param)
with case(cond1, True):
val1 = task1()
cond2 = check_condition2(cond2_param)
with case(cond2, True):
val2 = task2(val1)
cond3 = check_condition3(cond3_param)
with case(cond3, True):
val3 = task3(val2)
cond4 = check_condition4(cond4_param)
with case(cond4, True):
val4 = task4(val3)
flow_result = flow.run(cond1_param="True", cond2_param="False", cond3_param="True", cond4_param="True")
flow.visualize(flow_state=flow_result)
skip_on_upstream_skip = False
in a Task, this Task is never SKIPPED, even if it is under a condition not satisfied. Does it make sense?
It seems that a Task is skipped, just becouse its previous task (case(True) task) was skipped. That's the Case() behavior on Prefect?nicholas
02/13/2021, 6:16 AMskip_on_upstream_skip = True
for all tasks you intend to run even if their upstream doesn'tSKIPPED
for any reason - for any you don't want this to be the case, you'll need to explicitly tell Prefect that that's the caseJoao Erik Melo
02/17/2021, 5:18 PMyou'll need to explicitly tell Prefect that that's the case ,you mean to set
skip_on_upstream_skip = False
? Or is there another way to tell prefect that I want a task don't be skipped, even if its upstream task was skipped? Is there any Signal to do that?
I know the oposit is true: I can raise a SKIP
Signal inside a task based on a input parameter. Then I can force a task to skip. Problably will be my aproach if there isn't a better alternative.
2.
And for keep the data flow, even if a task is skipped in the middle, I'm thinking in use a shared space, like a s3 bucket, where all tasks read from and write to.
Is there another way to keep the data flow even when a tasks is SKIPPED
?nicholas
02/17/2021, 5:21 PMskip_on_upstream_skip = False
on every task you want to run regardless of upstream skip.
2. I think your approach of using an S3 bucket is reasonable - since tasks are encapsulated functions with defined inputs-outputs, there's no concept of "data flow" in Prefect, except between direct dependencies.Joao Erik Melo
02/17/2021, 5:52 PM