David Elliott
10/27/2022, 1:22 PMallow_failure()
wrapper for the KubernetesJob
infra - it seems to have no effect when running with SequentialTaskRunner
or ConcurrentTaskRunner
. When I run the same flow locally it works as expected (downstream tasks are ran), but when it’s ran on k8s (with KubernetesJob
infrastructure) it ignores the wrapper and the downstream tasks just go to NotReady
as if the allow_failure()
wrapper weren’t used at all. MRE in 🧵# Prefect version 2.6.1
from prefect import flow, task, allow_failure, get_run_logger
from prefect.task_runners import ConcurrentTaskRunner
import time
@task()
def sql_task(task_name: str):
logger = get_run_logger()
<http://logger.info|logger.info>(f"RUNNING: {task_name}")
time.sleep(3)
if task_name == "task_2":
raise Exception("failed task")
@flow(name="demo_11", task_runner=ConcurrentTaskRunner())
def demo_11():
logger = get_run_logger()
<http://logger.info|logger.info>(f"Flow now starting")
task_1 = sql_task.with_options(name='task_1').submit(task_name='task_1', wait_for=[])
task_2 = sql_task.with_options(name='task_2').submit(task_name='task_2', wait_for=[allow_failure(task_1)])
task_3 = sql_task.with_options(name='task_3').submit(task_name='task_3', wait_for=[allow_failure(task_2)])
if __name__ == "__main__":
demo_11()
Anna Geller
10/27/2022, 1:40 PMDavid Elliott
10/27/2022, 1:42 PM