Constantino Schillebeeckx
12/17/2021, 5:52 PM@task
def check_cond():
return False
@task
def run_task(anchor_date, days_from):
logger = prefect.context.logger
logger.critical(f"{days_from=}")
with Flow('foo') as flow:
anchor_date = Parameter(name="anchor_date", default=None)
cond = check_cond()
with case(cond, True):
anchor_date = run_task(anchor_date, days_from=0)
with case(cond, False):
anchor_date = run_task(anchor_date, days_from=-1)
however the False condition never gets run. When check_cond returns True - the flows runs as expected. Am I missing something?Kevin Kho
run_task is getting two dependencies and one is skipped, causing it to skip also.Kevin Kho
anchor_date in the True condition so in the case(cond,False) block, that anchor_date has a different meaning because of the DAG construction. You need to change the name like this:
with case(cond, True):
z = run_task(anchor_date, days_from=0)
with case(cond, False):
z = run_task(anchor_date, days_from=-1)Constantino Schillebeeckx
12/17/2021, 6:56 PMConstantino Schillebeeckx
12/17/2021, 7:05 PM@task
def check_cond():
return True
@task
def run_task(anchor_date, days_from):
logger = prefect.context.logger
logger.critical(f"{days_from=}")
return days_from
@task
def final(anchor):
logger = prefect.context.logger
logger.critical(f"{anchor=}")
with Flow('foo') as flow:
anchor_date = Parameter(name="anchor_date", default=None)
cond = check_cond()
with case(cond, True):
z = run_task(anchor_date, days_from=0)
with case(cond, False):
z = run_task(anchor_date, days_from=-1)
final(z)
now under the True condition, task final gets skippedKevin Kho
final relies has an upstream on both run_task calls and one of them gets SKIPPED which then forwards the SKIP. So you need to use an always_run trigger or do skip_on_upstream_skip=False in the @taskConstantino Schillebeeckx
12/17/2021, 7:10 PMKevin Kho
Constantino Schillebeeckx
12/17/2021, 7:10 PMConstantino Schillebeeckx
12/17/2021, 7:29 PM@task(trigger=always_run)
def final(anchor):
logger = prefect.context.logger
logger.critical(f"{anchor=}")
and the logs show:
└── 13:27:48 | INFO | Task 'final': Starting task run...
└── 13:27:48 | DEBUG | Task 'final': Upstream states were skipped; ending run.
└── 13:27:48 | DEBUG | Task 'final': Handling state change from Pending to Skipped
└── 13:27:48 | INFO | Task 'final': Finished task run for task with final state: 'Skipped'Constantino Schillebeeckx
12/17/2021, 7:30 PM@task(skip_on_upstream_skip=False)
def final(anchor):
logger = prefect.context.logger
logger.critical(f"{anchor=}")
the task does run, but the input arg always evaluates to None (regardless of the case condition)Kevin Kho
Kevin Kho
@task
def check_cond():
return True
@task
def run_task(anchor_date, days_from):
logger = prefect.context.logger
<http://logger.info|logger.info>(f"{days_from=}")
@task(trigger=always_run)
def final(anchor):
logger = prefect.context.logger
<http://logger.info|logger.info>(f"{anchor=}")
with Flow('foo') as flow:
anchor_date = Parameter(name="anchor_date", default=None)
cond = check_cond()
with case(cond, True):
z1 = run_task(anchor_date, days_from=0)
with case(cond, False):
z2 = run_task(anchor_date, days_from=-1)
z = merge(z1,z2)
final(z)Kevin Kho
Constantino Schillebeeckx
12/17/2021, 8:24 PMargs based on some condition. e.g.
run_task(anchor, days_from=-1 if some_cond else 0)
the above doesn't work, so i went down this conditional case rabbit holeKevin Kho
Constantino Schillebeeckx
12/17/2021, 8:28 PM@task
def funky(anchor):
if anchor == 'foo':
return run_task.run(anchor, days_from=-1)
else:
return run_task.run(anchor, days_from=0)
like so?Kevin Kho
@task
def funky(anchor):
if anchor == 'foo':
return 1
else:
return 0
with Flow(...) as flow:
x = Parameter("x", True)
days = funky(x)
run_task(anchor_date, days_from=days)Constantino Schillebeeckx
12/17/2021, 8:36 PM