Lee Mendelowitz
03/01/2023, 1:41 PMfrom prefect.states import Completed, Failed
from prefect import task, flow
@flow
def flow_that_fails():
failed_values = ['e', 'f', 'g']
return Failed(message = 'some flows failed', data = failed_values)
@flow
def my_flow():
result = flow_that_fails(return_state = True)
data = result.result(raise_on_failure = False)
print(data)
my_flow()
gives:
TypeError: Unexpected result for failed state: ['e', 'f', 'g'] —— list cannot be resolved into an exception
I’m on prefect 2.8.3Christopher Boyd
03/01/2023, 2:07 PMfrom prefect import flow, task
@task
def my_task():
raise ValueError()
@flow
def my_flow():
try:
my_task()
except ValueError:
print("Oh no! The task failed.")
return True
my_flow()
None
), its state is determined by the states of all of the tasks and subflows within it.
• If any task run or subflow run failed, then the final flow run state is marked as FAILED
.
• If any task run was cancelled, then the final flow run state is marked as CANCELLED
.
• If a flow returns a manually created state, it is used as the state of the final flow run. This allows for manual determination of final state.
• If the flow run returns any other object, then it is marked as completed.Lee Mendelowitz
03/01/2023, 2:17 PMChristopher Boyd
03/01/2023, 2:20 PMLee Mendelowitz
03/01/2023, 2:21 PMChristopher Boyd
03/01/2023, 2:21 PMfrom prefect import flow, task
@task
def always_fails_task():
raise ValueError("I fail successfully")
@task
def always_succeeds_task():
print("I'm fail safe!")
return "success"
@flow
def always_succeeds_flow():
x = always_fails_task.submit().result(raise_on_failure=False)
y = always_succeeds_task.submit(wait_for=[x])
return y
if __name__ == "__main__":
always_succeeds_flow()
Lee Mendelowitz
03/01/2023, 2:23 PMChristopher Boyd
03/01/2023, 2:25 PMZanie
03/01/2023, 4:38 PMreturn Failed(message = ‘some flows failed’, data = failed_values)
data
for Failed
states needs to be an exception typefrom prefect import flow, task
from prefect.states import Failed
class MyException(Exception):
def __init__(self, data):
self.data = data
@flow
def foo():
try:
bar()
except MyException as exc:
print(exc.data)
return True
@task
def bar():
return Failed(message="oh no", data=MyException(data="test"))
foo()
Lee Mendelowitz
03/01/2023, 7:00 PM