infra - it seems to have no effect when running with
SequentialTaskRunner
or
ConcurrentTaskRunner
. When I run the same flow locally it works as expected (downstream tasks are ran), but when it’s ran on k8s (with
KubernetesJob
infrastructure) it ignores the wrapper and the downstream tasks just go to
NotReady
as if the
allow_failure()
wrapper weren’t used at all. MRE in 🧵
gratitude thank you 1
✅ 1
David Elliott
10/27/2022, 1:22 PM
Copy code
# Prefect version 2.6.1
from prefect import flow, task, allow_failure, get_run_logger
from prefect.task_runners import ConcurrentTaskRunner
import time
@task()
def sql_task(task_name: str):
logger = get_run_logger()
<http://logger.info|logger.info>(f"RUNNING: {task_name}")
time.sleep(3)
if task_name == "task_2":
raise Exception("failed task")
@flow(name="demo_11", task_runner=ConcurrentTaskRunner())
def demo_11():
logger = get_run_logger()
<http://logger.info|logger.info>(f"Flow now starting")
task_1 = sql_task.with_options(name='task_1').submit(task_name='task_1', wait_for=[])
task_2 = sql_task.with_options(name='task_2').submit(task_name='task_2', wait_for=[allow_failure(task_1)])
task_3 = sql_task.with_options(name='task_3').submit(task_name='task_3', wait_for=[allow_failure(task_2)])
if __name__ == "__main__":
demo_11()
David Elliott
10/27/2022, 1:23 PM
Flow is added to a docker image which is stored in ECR, then we deploy a k8s agent with that same ECR image which listens for flow runs and creates k8s jobs per the KubernetesJob infra.
Deployment file below:
thanks a lot for reporting this! 🙌
Are you open to transferring this to a GitHub issue? it's easier to keep track of/prioritize/submit PRs with fixes when bug reports are submitted via GH issues
d
David Elliott
10/27/2022, 1:42 PM
Sure, will raise one now (and you can let me know if I added enough info!)
Bring your towel and join one of the fastest growing data communities. Welcome to our second-generation open source orchestration platform, a completely rethought approach to dataflow automation.