David Elliott
10/27/2022, 1:22 PMallow_failure() wrapper for the KubernetesJob infra - it seems to have no effect when running with SequentialTaskRunner or ConcurrentTaskRunner . When I run the same flow locally it works as expected (downstream tasks are ran), but when it’s ran on k8s (with KubernetesJob infrastructure) it ignores the wrapper and the downstream tasks just go to NotReady as if the allow_failure() wrapper weren’t used at all. MRE in 🧵David Elliott
10/27/2022, 1:22 PM# Prefect version 2.6.1
from prefect import flow, task, allow_failure, get_run_logger
from prefect.task_runners import ConcurrentTaskRunner
import time
@task()
def sql_task(task_name: str):
logger = get_run_logger()
<http://logger.info|logger.info>(f"RUNNING: {task_name}")
time.sleep(3)
if task_name == "task_2":
raise Exception("failed task")
@flow(name="demo_11", task_runner=ConcurrentTaskRunner())
def demo_11():
logger = get_run_logger()
<http://logger.info|logger.info>(f"Flow now starting")
task_1 = sql_task.with_options(name='task_1').submit(task_name='task_1', wait_for=[])
task_2 = sql_task.with_options(name='task_2').submit(task_name='task_2', wait_for=[allow_failure(task_1)])
task_3 = sql_task.with_options(name='task_3').submit(task_name='task_3', wait_for=[allow_failure(task_2)])
if __name__ == "__main__":
demo_11()David Elliott
10/27/2022, 1:23 PMDavid Elliott
10/27/2022, 1:23 PMAnna Geller
David Elliott
10/27/2022, 1:42 PMDavid Elliott
10/27/2022, 1:51 PM