Thread
#prefect-community
    Constantino Schillebeeckx

    Constantino Schillebeeckx

    9 months ago
    I'm trying to do a conditional flow like the one shown below:
    @task
    def check_cond():
        return False
    
    
    @task
    def run_task(anchor_date, days_from):
    
        logger = prefect.context.logger
        logger.critical(f"{days_from=}")
    
    
    with Flow('foo') as flow:
    
        anchor_date = Parameter(name="anchor_date", default=None)
        cond = check_cond()
    
        with case(cond, True):
            anchor_date = run_task(anchor_date, days_from=0)
    
        with case(cond, False):
            anchor_date = run_task(anchor_date, days_from=-1)
    however the
    False
    condition never gets run. When
    check_cond
    returns
    True
    - the flows runs as expected. Am I missing something?
    Kevin Kho

    Kevin Kho

    9 months ago
    I am looking into it. I think the second
    run_task
    is getting two dependencies and one is skipped, causing it to skip also.
    Ok I think I understand it now. The issue is you overwrite
    anchor_date
    in the True condition so in the
    case(cond,False)
    block, that
    anchor_date
    has a different meaning because of the DAG construction. You need to change the name like this:
    with case(cond, True):
            z = run_task(anchor_date, days_from=0)
    
        with case(cond, False):
            z = run_task(anchor_date, days_from=-1)
    Constantino Schillebeeckx

    Constantino Schillebeeckx

    9 months ago
    Beautiful, you're a life safer
    sorry, I'm still having issues with this:
    @task
    def check_cond():
        return True
    
    
    @task
    def run_task(anchor_date, days_from):
    
        logger = prefect.context.logger
        logger.critical(f"{days_from=}")
    
        return days_from
    
    
    @task
    def final(anchor):
        logger = prefect.context.logger
        logger.critical(f"{anchor=}")
    
    
    with Flow('foo') as flow:
    
        anchor_date = Parameter(name="anchor_date", default=None)
        cond = check_cond()
    
        with case(cond, True):
            z = run_task(anchor_date, days_from=0)
    
        with case(cond, False):
            z = run_task(anchor_date, days_from=-1)
    
        final(z)
    now under the
    True
    condition, task final gets skipped
    Kevin Kho

    Kevin Kho

    9 months ago
    No worries, this one is common. This is because
    final
    relies has an upstream on both
    run_task
    calls and one of them gets SKIPPED which then forwards the SKIP. So you need to use an
    always_run
    trigger or do
    skip_on_upstream_skip=False
    in the
    @task
    Constantino Schillebeeckx

    Constantino Schillebeeckx

    9 months ago
    ah beautiful - sounds like I should submit a PR to update docs with an example like this?
    Kevin Kho

    Kevin Kho

    9 months ago
    That’s always welcome! The conditional logic page would be good
    Constantino Schillebeeckx

    Constantino Schillebeeckx

    9 months ago
    you got it; least I can do for all the help you give me
    changed to
    @task(trigger=always_run)
    def final(anchor):
        logger = prefect.context.logger
        logger.critical(f"{anchor=}")
    and the logs show:
    └── 13:27:48 | INFO    | Task 'final': Starting task run...
    └── 13:27:48 | DEBUG   | Task 'final': Upstream states were skipped; ending run.
    └── 13:27:48 | DEBUG   | Task 'final': Handling state change from Pending to Skipped
    └── 13:27:48 | INFO    | Task 'final': Finished task run for task with final state: 'Skipped'
    with
    @task(skip_on_upstream_skip=False)
    def final(anchor):
        logger = prefect.context.logger
        logger.critical(f"{anchor=}")
    the task does run, but the input arg always evaluates to
    None
    (regardless of the case condition)
    Kevin Kho

    Kevin Kho

    9 months ago
    Oh ok can test more in a bit
    The simple fix is this:
    @task
    def check_cond():
        return True
    
    @task
    def run_task(anchor_date, days_from):
        logger = prefect.context.logger
        <http://logger.info|logger.info>(f"{days_from=}")
    
    @task(trigger=always_run)
    def final(anchor):
        logger = prefect.context.logger
        <http://logger.info|logger.info>(f"{anchor=}")
    
    with Flow('foo') as flow:
    
        anchor_date = Parameter(name="anchor_date", default=None)
        cond = check_cond()
    
        with case(cond, True):
            z1 = run_task(anchor_date, days_from=0)
    
        with case(cond, False):
            z2 = run_task(anchor_date, days_from=-1)
    
        z = merge(z1,z2)
        final(z)
    I guess using the same variable causes weirdness with upstream tasks. Will look a bit more
    Constantino Schillebeeckx

    Constantino Schillebeeckx

    9 months ago
    If I step back a bit, ultimately I'd like to call the same task with different
    args
    based on some condition. e.g.
    run_task(anchor, days_from=-1 if some_cond else 0)
    the above doesn't work, so i went down this conditional
    case
    rabbit hole
    Kevin Kho

    Kevin Kho

    9 months ago
    oof! then I think you really should just use an intermediate task that takes in True/False and then return the value you want and pass it downstream to the args right?
    Constantino Schillebeeckx

    Constantino Schillebeeckx

    9 months ago
    @task
    def funky(anchor):
      if anchor == 'foo':
        return run_task.run(anchor, days_from=-1)
       else:
         return run_task.run(anchor, days_from=0)
    like so?
    Kevin Kho

    Kevin Kho

    9 months ago
    No like:
    @task
    def funky(anchor):
      if anchor == 'foo':
         return 1
       else:
         return 0
    
    with Flow(...) as flow:
        x = Parameter("x", True)
        days = funky(x)
        run_task(anchor_date, days_from=days)
    Constantino Schillebeeckx

    Constantino Schillebeeckx

    9 months ago
    I see, that's much more straight forward, thank you.