Rhys Mansal

    Rhys Mansal

    4 months ago
    Hi all, I'm having difficulty with the following code.
    param_a = flow.add_task(Parameter("param_a", default=True))
    with case(param_a, False):
        task_a = some_task(arg)
    task_b = some_task(task_a)
    some_task
    is being run multiple times with different arguments. When
    param_a
    is false some of these should be skipped. If I do not use
    @task(skip_on_upstream_skip=False)
    in the task decorator on
    some_task
    then all downstream tasks from the first skipped task are skipped, whether they are inside a case or not. If I do, none of them are skipped regardless of what
    param_a
    is set to. Does anyone have any idea how to get only the tasks inside the with block to skip (and only when
    param_a
    is true)?
    Anna Geller

    Anna Geller

    4 months ago
    you don't need
    flow.add_task(parameter_task)
    if you are passing it to downstream tasks and you can explicitly define your flow structure (DAG) based on the value of the parameter using two separate blocks for True and False:
    with Flow("x") as flow:
        param_a = Parameter("param_a", default=True)
        with case(param_a, False):
            task_a = some_task(arg)
            task_b = some_task(task_a)
        with case(param_a, True):
            some_task()
    this way, you can skip a specific task based on parameter value for more on parameters, check this post
    Rhys Mansal

    Rhys Mansal

    4 months ago
    thank you