Igor Bondartsov

    Igor Bondartsov

    1 year ago
    Hello! I am confused: my flow failed in a positive case. I have three tasks Parent, ChildSuccess, ChildFailed. Childs depend on Parent. ChildFailed with trigger rule any_failed. When Parent- Success and ChildSuccess - success my flow Failed - I don't understand why? I thought that ChaildFailed should be with state skipped but it failed.
    Kevin Kho

    Kevin Kho

    1 year ago
    Hey @Igor Bondartsov, do you have a small code snippet you can share?
    Igor Bondartsov

    Igor Bondartsov

    1 year ago
    @Kevin Kho import prefect from prefect import task, Flow, Parameter from prefect.triggers import any_failed from prefect.executors import LocalDaskExecutor @task def parent_task(should_fail): if should_fail: raise Exception("should_fail set to true") @task def error_handler(trigger=any_failed): logger = prefect.context.get("logger") logger.info("Handled task error") @task def success_handler(): logger = prefect.context.get("logger") logger.info("Handled task sucess") with Flow("TestFlow") as flow: should_fail = Parameter("should_fail", default=False) parent = parent_task(should_fail) error_handler(upstream_tasks=[parent_task]) success_handler(upstream_tasks=[parent_task]) flow.executor = LocalDaskExecutor() if name == "main": flow.run()
    Kevin Kho

    Kevin Kho

    1 year ago
    Looking into this
    So there are two things here, first is that the
    trigger=any_failed
    should go inside the
    @task(trigger=any_failed)
    . And then you should change
    error_handler(upstream_tasks=[parent_task])
    to
    error_handler(upstream_tasks=[parent])
    so that it depends on the result of
    parent
    . After that, the flow triggers will work as expected. You are right that this flow will always be a Failure. You can fix this by following the example here and setting
    success
    as the reference task so that the Flow success/failure depends on that task and not the
    error
    task. You can also put the logic of the failed task inside a
    state_handler
    and attach it to the task that can fail. Docs on that here