https://prefect.io logo
Title
d

David Elliott

10/27/2022, 1:22 PM
Hey folks, I believe I’ve found a bug with the
allow_failure()
wrapper for the
KubernetesJob
infra - it seems to have no effect when running with
SequentialTaskRunner
or
ConcurrentTaskRunner
. When I run the same flow locally it works as expected (downstream tasks are ran), but when it’s ran on k8s (with
KubernetesJob
infrastructure) it ignores the wrapper and the downstream tasks just go to
NotReady
as if the
allow_failure()
wrapper weren’t used at all. MRE in 🧵
:gratitude-thank-you: 1
1
# Prefect version 2.6.1

from prefect import flow, task, allow_failure, get_run_logger
from prefect.task_runners import ConcurrentTaskRunner
import time

@task()
def sql_task(task_name: str):
    logger = get_run_logger()
    <http://logger.info|logger.info>(f"RUNNING: {task_name}")
    time.sleep(3)
    if task_name == "task_2":
        raise Exception("failed task")

@flow(name="demo_11", task_runner=ConcurrentTaskRunner())
def demo_11():
    logger = get_run_logger()
    <http://logger.info|logger.info>(f"Flow now starting")
    task_1 = sql_task.with_options(name='task_1').submit(task_name='task_1', wait_for=[])
    task_2 = sql_task.with_options(name='task_2').submit(task_name='task_2', wait_for=[allow_failure(task_1)])
    task_3 = sql_task.with_options(name='task_3').submit(task_name='task_3', wait_for=[allow_failure(task_2)])

if __name__ == "__main__":
    demo_11()
Flow is added to a docker image which is stored in ECR, then we deploy a k8s agent with that same ECR image which listens for flow runs and creates k8s jobs per the KubernetesJob infra. Deployment file below:
a

Anna Geller

10/27/2022, 1:40 PM
thanks a lot for reporting this! 🙌 Are you open to transferring this to a GitHub issue? it's easier to keep track of/prioritize/submit PRs with fixes when bug reports are submitted via GH issues
d

David Elliott

10/27/2022, 1:42 PM
Sure, will raise one now (and you can let me know if I added enough info!)
👍 2