Aleksandr Liadov
01/23/2023, 2:17 PMrun_deployment
? I want to deploy only parent flow, and not subflows.
allow_failure
doesnt work.
Code to show what I want in threads.from prefect import task, flow, allow_failure
import random
@task
def task_1():
print("task 1 succeeded")
@task
def task_2():
print("task 2 succeeded")
@task
def task_3():
print("task 3 succeeded")
@task
def task_4():
print("This task often fails")
if random.random() > 0.5:
raise ValueError("Non-deterministic error has occured.")
else:
print("task 4 succeeded")
@task
def task_5():
print("task 5 succeeded")
@task
def clean_up_task():
print("Cleaning up 🧹")
@flow
def first_subflow():
one = task_1.submit()
two = task_2.submit()
three = task_3.submit()
four = task_4.submit(wait_for=[one, two, three])
@flow
def second_subflow():
five = task_5.submit()
@flow(log_prints=True, name="Cleanup task may not get executed")
def main():
_first_subflow = first_subflow()
_second_subflow = second_subflow()
clean_up_task.submit(
wait_for=[allow_failure(_first_subflow), allow_failure(_second_subflow)]
)
if __name__ == "__main__":
main()
Ryan Peden
01/23/2023, 2:31 PMfrom prefect import task, flow, allow_failure
import random
@task
def task_1():
print("task 1 succeeded")
@task
def task_2():
print("task 2 succeeded")
@task
def task_3():
print("task 3 succeeded")
@task
def task_4():
print("This task often fails")
if random.random() > 0.5:
raise ValueError("Non-deterministic error has occured.")
else:
print("task 4 succeeded")
@task
def task_5():
print("task 5 succeeded")
@task
def clean_up_task():
print("Cleaning up 🧹")
@flow
def first_subflow():
one = task_1.submit()
two = task_2.submit()
three = task_3.submit()
four = allow_failure(task_4.submit(wait_for=[one, two, three]))
@flow
def second_subflow():
five = task_5.submit()
@flow(log_prints=True, name="Cleanup task may not get executed")
def main():
try:
_first_subflow = first_subflow()
_second_subflow = second_subflow()
except ValueError:
clean_up_task.submit()
if __name__ == "__main__":
main()
Aleksandr Liadov
01/23/2023, 2:37 PM_first_subflow = allow_failure(first_subflow().submit())
if _first_subflow.wait() == "Failed":
clean_up_task.submit()
Ryan Peden
01/23/2023, 3:07 PMfrom prefect import task, flow, allow_failure, get_run_logger
import random
@task
def task_1():
print("task 1 succeeded")
@task
def task_2():
print("task 2 succeeded")
@task
def task_3():
print("task 3 succeeded")
@task
def task_4():
print("This task often fails")
if random.random() > 0.0:
raise ValueError("Non-deterministic error has occured.")
else:
print("task 4 succeeded")
@task
def task_5():
print("task 5 succeeded")
@task
def clean_up_task():
print("Cleaning up 🧹")
@flow
def first_subflow():
get_run_logger()
one = task_1.submit()
two = task_2.submit()
three = task_3.submit()
four = task_4.submit(wait_for=[one, two, three], return_state=True)
if four.is_failed():
return False
return True
@flow
def second_subflow():
five = task_5.submit()
@flow(log_prints=True, name="Cleanup task may not get executed")
def main():
success = _first_subflow = first_subflow()
if not success:
clean_up_task.submit()
# you could return early or carry on if your second subflow
# can still work after task_4 fails and clean_up_task runs
_second_subflow = second_subflow()
if __name__ == "__main__":
main()
Aleksandr Liadov
01/23/2023, 3:19 PM_first_subflow = first_subflow(return_state=True)
if _first_subflow.type == StateType.FAILED:
clean_up_task.submit()
Ryan Peden
01/23/2023, 3:23 PM