Hi there, I'm new in Prefect and I'm facing some i...
# prefect-server
j
Hi there, I'm new in Prefect and I'm facing some issues in an Flow I need to implement. I need a Flow with few tasks in sequence (Task1 -> Task2 -> Task3). But depending on diferent conditions each Task can be SKIPPED according with its respective condition. I need each task (that are not skipped) process a dataset and pass this data precessed to the next NOT SKIPPED task. There a two main issues: 1. If Task1 and Task3 a forced to run, and Task2, in the middle, forced to be SKIPPED, the downstream Task3 is SKIPPED too (if skip_on_upstream_skip=True). Although, if I set skip_on_upstream_skip to False (in all 3 tasks), Task2 is NOT skipped as should be [image].  2. How can I keep the data flow through a sequence of tasks, if one of then is skipped? When a task is skipped, I lost the data flow through it. Woudl be the best practice use a shared path for all tasks (a hive table that is read and overwrited for each task, for example)  is OR there is a more Prefectonic way to do that? May someone help me, please?
Copy code
with Flow("conditional-branches") as flow:

    cond1 = check_condition1()
    with case(cond1, True):
        val1 = task1()

    cond2 = check_condition2()
    with case(cond2, True):
        val2 = task2()

    cond3 = check_condition3()
    with case(cond3, True):
        val3 = task3()



    flow.set_dependencies(
    task=val1,
    downstream_tasks=[val2])

    flow.set_dependencies(
    task=val2,
    downstream_tasks=[val3])
n
Hi @Joao Erik Melo - I'm a little hazy on what you're trying to do but it looks like you want data to continue flowing through a pipeline where returns are dependent on a series of upstream BUT shouldn't be completely dependent on those. i wonder if a pattern like this might work:
Copy code
with Flow("conditional-branches") as flow:

    cond1 = check_condition1()
    with case(cond1, True):
        val1 = task1()

    cond2 = check_condition2()
    with case(cond2, True):
        val2 = task2(upstream_tasks=[val1])

    cond3 = check_condition3()
    with case(cond3, True):
        val3 = task3(upstream_tasks=[val1, val2])
combined with a
not_all_skipped
trigger on task2 and task3
but in the sample you provided you aren't passing any data so that part is unclear to me
j
Hi @nicholas, thanks a lot for your answer. I'm sending more complete example. Actually, what I need is, if all conditions are true, then all Tasks should run, passing their output data to the next task. But if one of the tasks is SKIPPED, the data should keep flowing from Task1 to Task4, just skipping the SKIPPED Tasks. My desired behavior in this example is: if I force Task2 to be SKIPPED, the output data from Task1 have to flow to Task3 and the Task3 output goes to Task4, always in this order. PROBLEM But as you can see in image1, if I force Task2 to be SKIPPED and all other tasks to NOT skip (setting on all tasks: 
skip_on_upstream_skip = False
), than even Task2 is NOT skipped as should be. And if I do the same experiment, but setting 
skip_on_upstream_skip = True
, all downstream tasks after Task2 are SKIPPED too (image2), not only Task2 as I would like.
Copy code
with Flow("conditional-branches") as flow:

    cond1_param = Parameter(name="cond1_param")
    cond2_param = Parameter(name="cond2_param")
    cond3_param = Parameter(name="cond3_param")
    cond4_param = Parameter(name="cond4_param")

    cond1 = check_condition1(cond1_param)
    with case(cond1, True):
        val1 = task1()

    cond2 = check_condition2(cond2_param)
    with case(cond2, True):
        val2 = task2(val1)

    cond3 = check_condition3(cond3_param)
    with case(cond3, True):
        val3 = task3(val2)

    cond4 = check_condition4(cond4_param)
    with case(cond4, True):
        val4 = task4(val3)


flow_result = flow.run(cond1_param="True", cond2_param="False", cond3_param="True", cond4_param="True")
flow.visualize(flow_state=flow_result)
@nicholas in this example of mine, where the "ELSE" of the case statement { i.e. case(False) }, doesn't exist, if I set
skip_on_upstream_skip = False
in a Task, this Task is never SKIPPED, even if it is under a condition not satisfied. Does it make sense? It seems that a Task is skipped, just becouse its previous task (case(True) task) was skipped. That's the Case() behavior on Prefect?
n
Hi @Joao Erik Melo - sorry for the slow response, I've been sporadically AFK here; let me give this a harder look and I'll have an answer for you as soon as I can!
Hi @Joao Erik Melo - I've looked into this a bit more and it seems like what you're looking for would mean you need to set
skip_on_upstream_skip = True
for all tasks you intend to run even if their upstream doesn't
The default behavior for tasks is for them to skip if their upstream tasks raise
SKIPPED
for any reason - for any you don't want this to be the case, you'll need to explicitly tell Prefect that that's the case
j
Hi @nicholas, thanks for your response. 1. When you say:
you'll need to explicitly tell Prefect that that's the case ,
you mean to set
skip_on_upstream_skip = False
? Or is there another way to tell prefect that I want a task don't be skipped, even if its upstream task was skipped? Is there any Signal to do that? I know the oposit is true: I can raise a
SKIP
Signal inside a task based on a input parameter. Then I can force a task to skip. Problably will be my aproach if there isn't a better alternative. 2. And for keep the data flow, even if a task is skipped in the middle, I'm thinking in use a shared space, like a s3 bucket, where all tasks read from and write to. Is there another way to keep the data flow even when a tasks is
SKIPPED
?
n
1. Yes, you'll want to set
skip_on_upstream_skip = False
on every task you want to run regardless of upstream skip. 2. I think your approach of using an S3 bucket is reasonable - since tasks are encapsulated functions with defined inputs-outputs, there's no concept of "data flow" in Prefect, except between direct dependencies.
j
Nice! Thanks a lot for your help, @nicholas!
😄 1