Constantino Schillebeeckx
12/17/2021, 5:52 PM@task
def check_cond():
return False
@task
def run_task(anchor_date, days_from):
logger = prefect.context.logger
logger.critical(f"{days_from=}")
with Flow('foo') as flow:
anchor_date = Parameter(name="anchor_date", default=None)
cond = check_cond()
with case(cond, True):
anchor_date = run_task(anchor_date, days_from=0)
with case(cond, False):
anchor_date = run_task(anchor_date, days_from=-1)
however the False
condition never gets run. When check_cond
returns True
- the flows runs as expected. Am I missing something?Kevin Kho
run_task
is getting two dependencies and one is skipped, causing it to skip also.Kevin Kho
anchor_date
in the True condition so in the case(cond,False)
block, that anchor_date
has a different meaning because of the DAG construction. You need to change the name like this:
with case(cond, True):
z = run_task(anchor_date, days_from=0)
with case(cond, False):
z = run_task(anchor_date, days_from=-1)
Constantino Schillebeeckx
12/17/2021, 6:56 PMConstantino Schillebeeckx
12/17/2021, 7:05 PM@task
def check_cond():
return True
@task
def run_task(anchor_date, days_from):
logger = prefect.context.logger
logger.critical(f"{days_from=}")
return days_from
@task
def final(anchor):
logger = prefect.context.logger
logger.critical(f"{anchor=}")
with Flow('foo') as flow:
anchor_date = Parameter(name="anchor_date", default=None)
cond = check_cond()
with case(cond, True):
z = run_task(anchor_date, days_from=0)
with case(cond, False):
z = run_task(anchor_date, days_from=-1)
final(z)
now under the True
condition, task final gets skippedKevin Kho
final
relies has an upstream on both run_task
calls and one of them gets SKIPPED which then forwards the SKIP. So you need to use an always_run
trigger or do skip_on_upstream_skip=False
in the @task
Constantino Schillebeeckx
12/17/2021, 7:10 PMKevin Kho
Constantino Schillebeeckx
12/17/2021, 7:10 PMConstantino Schillebeeckx
12/17/2021, 7:29 PM@task(trigger=always_run)
def final(anchor):
logger = prefect.context.logger
logger.critical(f"{anchor=}")
and the logs show:
└── 13:27:48 | INFO | Task 'final': Starting task run...
└── 13:27:48 | DEBUG | Task 'final': Upstream states were skipped; ending run.
└── 13:27:48 | DEBUG | Task 'final': Handling state change from Pending to Skipped
└── 13:27:48 | INFO | Task 'final': Finished task run for task with final state: 'Skipped'
Constantino Schillebeeckx
12/17/2021, 7:30 PM@task(skip_on_upstream_skip=False)
def final(anchor):
logger = prefect.context.logger
logger.critical(f"{anchor=}")
the task does run, but the input arg always evaluates to None
(regardless of the case condition)Kevin Kho
Kevin Kho
@task
def check_cond():
return True
@task
def run_task(anchor_date, days_from):
logger = prefect.context.logger
<http://logger.info|logger.info>(f"{days_from=}")
@task(trigger=always_run)
def final(anchor):
logger = prefect.context.logger
<http://logger.info|logger.info>(f"{anchor=}")
with Flow('foo') as flow:
anchor_date = Parameter(name="anchor_date", default=None)
cond = check_cond()
with case(cond, True):
z1 = run_task(anchor_date, days_from=0)
with case(cond, False):
z2 = run_task(anchor_date, days_from=-1)
z = merge(z1,z2)
final(z)
Kevin Kho
Constantino Schillebeeckx
12/17/2021, 8:24 PMargs
based on some condition. e.g.
run_task(anchor, days_from=-1 if some_cond else 0)
the above doesn't work, so i went down this conditional case
rabbit holeKevin Kho
Constantino Schillebeeckx
12/17/2021, 8:28 PM@task
def funky(anchor):
if anchor == 'foo':
return run_task.run(anchor, days_from=-1)
else:
return run_task.run(anchor, days_from=0)
like so?Kevin Kho
@task
def funky(anchor):
if anchor == 'foo':
return 1
else:
return 0
with Flow(...) as flow:
x = Parameter("x", True)
days = funky(x)
run_task(anchor_date, days_from=days)
Constantino Schillebeeckx
12/17/2021, 8:36 PM