https://prefect.io logo
a

Aleksandr Liadov

01/23/2023, 2:17 PM
Hello, How I can do the following: If in subflow an assert was raised, I want that in parent flow a task that clean up will be executed. Could I do it without
run_deployment
? I want to deploy only parent flow, and not subflows.
allow_failure
doesnt work. Code to show what I want in threads.
✅ 2
Copy code
from prefect import task, flow, allow_failure
import random


@task
def task_1():
    print("task 1 succeeded")


@task
def task_2():
    print("task 2 succeeded")


@task
def task_3():
    print("task 3 succeeded")


@task
def task_4():
    print("This task often fails")
    if random.random() > 0.5:
        raise ValueError("Non-deterministic error has occured.")
    else:
        print("task 4 succeeded")


@task
def task_5():
    print("task 5 succeeded")


@task
def clean_up_task():
    print("Cleaning up 🧹")


@flow
def first_subflow():
    one = task_1.submit()
    two = task_2.submit()
    three = task_3.submit()
    four = task_4.submit(wait_for=[one, two, three])


@flow
def second_subflow():
    five = task_5.submit()


@flow(log_prints=True, name="Cleanup task may not get executed")
def main():
    _first_subflow = first_subflow()
    _second_subflow = second_subflow()
    clean_up_task.submit(
        wait_for=[allow_failure(_first_subflow), allow_failure(_second_subflow)]
    )


if __name__ == "__main__":
    main()
r

Ryan Peden

01/23/2023, 2:31 PM
Would something like this do what you need? It catches the error and runs the cleanup task:
Copy code
from prefect import task, flow, allow_failure
import random


@task
def task_1():
    print("task 1 succeeded")


@task
def task_2():
    print("task 2 succeeded")


@task
def task_3():
    print("task 3 succeeded")


@task
def task_4():
    print("This task often fails")
    if random.random() > 0.5:
        raise ValueError("Non-deterministic error has occured.")
    else:
        print("task 4 succeeded")


@task
def task_5():
    print("task 5 succeeded")


@task
def clean_up_task():
    print("Cleaning up 🧹")


@flow
def first_subflow():
    one = task_1.submit()
    two = task_2.submit()
    three = task_3.submit()
    four = allow_failure(task_4.submit(wait_for=[one, two, three]))


@flow
def second_subflow():
    five = task_5.submit()


@flow(log_prints=True, name="Cleanup task may not get executed")
def main():
    try:
        _first_subflow = first_subflow()
        _second_subflow = second_subflow()
    except ValueError:
        clean_up_task.submit()


if __name__ == "__main__":
    main()
a

Aleksandr Liadov

01/23/2023, 2:37 PM
@Ryan Peden yes, It could do what we need, thanks! However, I cannot explicitly say to Prefect that this subflow could be failed, so dont crash parent flow, because I want to check the subflow state and do clean up if state is failed?
What I’d like to have it is smth like below: @Ryan Peden do you think it’s possible?
Copy code
_first_subflow = allow_failure(first_subflow().submit())
if _first_subflow.wait() == "Failed":
    clean_up_task.submit()
r

Ryan Peden

01/23/2023, 3:07 PM
Yes, I think so! How about this:
Copy code
from prefect import task, flow, allow_failure, get_run_logger
import random


@task
def task_1():
    print("task 1 succeeded")


@task
def task_2():
    print("task 2 succeeded")


@task
def task_3():
    print("task 3 succeeded")


@task
def task_4():
    print("This task often fails")
    if random.random() > 0.0:
        raise ValueError("Non-deterministic error has occured.")
    else:
        print("task 4 succeeded")


@task
def task_5():
    print("task 5 succeeded")


@task
def clean_up_task():
    print("Cleaning up 🧹")


@flow
def first_subflow():
    get_run_logger()
    one = task_1.submit()
    two = task_2.submit()
    three = task_3.submit()
    four = task_4.submit(wait_for=[one, two, three], return_state=True)
    if four.is_failed():
        return False
   
    return True


@flow
def second_subflow():
    five = task_5.submit()


@flow(log_prints=True, name="Cleanup task may not get executed")
def main(): 
    success = _first_subflow = first_subflow()

    if not success:
        clean_up_task.submit()

    # you could return early or carry on if your second subflow
    # can still work after task_4 fails and clean_up_task runs
    _second_subflow = second_subflow()


if __name__ == "__main__":
    main()
(I changed the number the 0.0 in task 4 to make it easier to test failures) 🙂
a

Aleksandr Liadov

01/23/2023, 3:19 PM
Thanks for your help @Ryan Peden It seems to me I have found a solution:
Copy code
_first_subflow = first_subflow(return_state=True)
if _first_subflow.type == StateType.FAILED:
    clean_up_task.submit()
r

Ryan Peden

01/23/2023, 3:23 PM
Excellent - I should have thought of that. 🙂 I'm happy you found what you needed!
🙌 1
2 Views