https://prefect.io logo
c

Constantino Schillebeeckx

12/17/2021, 5:52 PM
I'm trying to do a conditional flow like the one shown below:
Copy code
@task
def check_cond():
    return False


@task
def run_task(anchor_date, days_from):

    logger = prefect.context.logger
    logger.critical(f"{days_from=}")


with Flow('foo') as flow:

    anchor_date = Parameter(name="anchor_date", default=None)
    cond = check_cond()

    with case(cond, True):
        anchor_date = run_task(anchor_date, days_from=0)

    with case(cond, False):
        anchor_date = run_task(anchor_date, days_from=-1)
however the
False
condition never gets run. When
check_cond
returns
True
- the flows runs as expected. Am I missing something?
k

Kevin Kho

12/17/2021, 6:07 PM
I am looking into it. I think the second
run_task
is getting two dependencies and one is skipped, causing it to skip also.
Ok I think I understand it now. The issue is you overwrite
anchor_date
in the True condition so in the
case(cond,False)
block, that
anchor_date
has a different meaning because of the DAG construction. You need to change the name like this:
Copy code
with case(cond, True):
        z = run_task(anchor_date, days_from=0)

    with case(cond, False):
        z = run_task(anchor_date, days_from=-1)
c

Constantino Schillebeeckx

12/17/2021, 6:56 PM
Beautiful, you're a life safer
sorry, I'm still having issues with this:
Copy code
@task
def check_cond():
    return True


@task
def run_task(anchor_date, days_from):

    logger = prefect.context.logger
    logger.critical(f"{days_from=}")

    return days_from


@task
def final(anchor):
    logger = prefect.context.logger
    logger.critical(f"{anchor=}")


with Flow('foo') as flow:

    anchor_date = Parameter(name="anchor_date", default=None)
    cond = check_cond()

    with case(cond, True):
        z = run_task(anchor_date, days_from=0)

    with case(cond, False):
        z = run_task(anchor_date, days_from=-1)

    final(z)
now under the
True
condition, task final gets skipped
k

Kevin Kho

12/17/2021, 7:07 PM
No worries, this one is common. This is because
final
relies has an upstream on both
run_task
calls and one of them gets SKIPPED which then forwards the SKIP. So you need to use an
always_run
trigger or do
skip_on_upstream_skip=False
in the
@task
c

Constantino Schillebeeckx

12/17/2021, 7:10 PM
ah beautiful - sounds like I should submit a PR to update docs with an example like this?
k

Kevin Kho

12/17/2021, 7:10 PM
That’s always welcome! The conditional logic page would be good
c

Constantino Schillebeeckx

12/17/2021, 7:10 PM
you got it; least I can do for all the help you give me
changed to
Copy code
@task(trigger=always_run)
def final(anchor):
    logger = prefect.context.logger
    logger.critical(f"{anchor=}")
and the logs show:
Copy code
└── 13:27:48 | INFO    | Task 'final': Starting task run...
└── 13:27:48 | DEBUG   | Task 'final': Upstream states were skipped; ending run.
└── 13:27:48 | DEBUG   | Task 'final': Handling state change from Pending to Skipped
└── 13:27:48 | INFO    | Task 'final': Finished task run for task with final state: 'Skipped'
with
Copy code
@task(skip_on_upstream_skip=False)
def final(anchor):
    logger = prefect.context.logger
    logger.critical(f"{anchor=}")
the task does run, but the input arg always evaluates to
None
(regardless of the case condition)
k

Kevin Kho

12/17/2021, 7:31 PM
Oh ok can test more in a bit
The simple fix is this:
Copy code
@task
def check_cond():
    return True

@task
def run_task(anchor_date, days_from):
    logger = prefect.context.logger
    <http://logger.info|logger.info>(f"{days_from=}")

@task(trigger=always_run)
def final(anchor):
    logger = prefect.context.logger
    <http://logger.info|logger.info>(f"{anchor=}")

with Flow('foo') as flow:

    anchor_date = Parameter(name="anchor_date", default=None)
    cond = check_cond()

    with case(cond, True):
        z1 = run_task(anchor_date, days_from=0)

    with case(cond, False):
        z2 = run_task(anchor_date, days_from=-1)

    z = merge(z1,z2)
    final(z)
I guess using the same variable causes weirdness with upstream tasks. Will look a bit more
c

Constantino Schillebeeckx

12/17/2021, 8:24 PM
If I step back a bit, ultimately I'd like to call the same task with different
args
based on some condition. e.g.
Copy code
run_task(anchor, days_from=-1 if some_cond else 0)
the above doesn't work, so i went down this conditional
case
rabbit hole
k

Kevin Kho

12/17/2021, 8:27 PM
oof! then I think you really should just use an intermediate task that takes in True/False and then return the value you want and pass it downstream to the args right?
c

Constantino Schillebeeckx

12/17/2021, 8:28 PM
Copy code
@task
def funky(anchor):
  if anchor == 'foo':
    return run_task.run(anchor, days_from=-1)
   else:
     return run_task.run(anchor, days_from=0)
like so?
k

Kevin Kho

12/17/2021, 8:30 PM
No like:
Copy code
@task
def funky(anchor):
  if anchor == 'foo':
     return 1
   else:
     return 0

with Flow(...) as flow:
    x = Parameter("x", True)
    days = funky(x)
    run_task(anchor_date, days_from=days)
c

Constantino Schillebeeckx

12/17/2021, 8:36 PM
I see, that's much more straight forward, thank you.
3 Views