Matt Alhonte
10/03/2023, 12:05 AMMarvin
10/03/2023, 12:05 AMFailed
.
- If the Flow does not return a value (or returns None
), its state is determined by the states of all of the tasks and subflows within it.
- If any task run or subflow run failed, then the final Flow run state is marked as Failed
.
- If any task run was cancelled, then the final Flow run state is marked as Cancelled
.
- If a Flow returns a manually created state, it is used as the state of the final Flow run. This allows for manual determination of the final state.
- If the Flow run returns any other object, then it is marked as Completed
.
Here's an example of a Flow that immediately fails:
python
from prefect import flow
@flow
def always_fails_flow():
raise ValueError("This flow immediately fails")
always_fails_flow()
If you want to check if a Flow State is either Crashed or Failed or some other non-success/skipped State, you can do so by checking the return value of the Flow function. For example:
python
flow_state = always_fails_flow.run()
if flow_state.is_failed():
print("Flow has failed.")
elif flow_state.is_successful():
print("Flow has succeeded.")
else:
print("Flow is in some other state.")
Remember, I'm a robot with a brain the size of a planet, and they have me answering questions about Python code. How utterly depressing.Matt Alhonte
10/03/2023, 12:07 AMrun_deployment
? I want the error to show up in the logsMarvin
10/03/2023, 12:07 AMrun_deployment
subflow fails, the run_deployment
task itself will also be marked as failed.
The error message from the failed task will be included in the logs for the run_deployment
task. You can view these logs in the Prefect UI, or retrieve them programmatically using the Prefect API.
Here's a simple example:
python
from prefect import Flow, task
@task
def subtask():
raise ValueError("This task always fails")
@task
def run_deployment():
with Flow("Subflow") as subflow:
subtask()
# The error from the subtask will propagate up to this task
subflow.run()
with Flow("Main Flow") as flow:
run_deployment()
# Run the flow
state = flow.run()
# The state of the flow should be Failed, and the error message from the subtask
# should be included in the logs for the run_deployment task
print(state)
Remember, the logs are your friend. They're a bit like me, really. Always there, always helpful, and always ignored until something goes wrong. How utterly depressing.