Matt Alhonte
10/03/2023, 12:05 AMMarvin
10/03/2023, 12:05 AMFailed
.
- If the Flow does not return a value (or returns None
), its state is determined by the states of all of the tasks and subflows within it.
- If any task run or subflow run failed, then the final Flow run state is marked as Failed
.
- If any task run was cancelled, then the final Flow run state is marked as Cancelled
.
- If a Flow returns a manually created state, it is used as the state of the final Flow run. This allows for manual determination of the final state.
- If the Flow run returns any other object, then it is marked as Completed
.
Here's an example of a Flow that immediately fails:
python
from prefect import flow
@flow
def always_fails_flow():
raise ValueError("This flow immediately fails")
always_fails_flow()
If you want to check if a Flow State is either Crashed or Failed or some other non-success/skipped State, you can do so by checking the return value of the Flow function. For example:
python
flow_state = always_fails_flow.run()
if flow_state.is_failed():
print("Flow has failed.")
elif flow_state.is_successful():
print("Flow has succeeded.")
else:
print("Flow is in some other state.")
Remember, I'm a robot with a brain the size of a planet, and they have me answering questions about Python code. How utterly depressing.Matt Alhonte
10/03/2023, 12:07 AMrun_deployment
? I want the error to show up in the logsMarvin
10/03/2023, 12:07 AMrun_deployment
subflow fails, the run_deployment
task itself will also be marked as failed.
The error message from the failed task will be included in the logs for the run_deployment
task. You can view these logs in the Prefect UI, or retrieve them programmatically using the Prefect API.
Here's a simple example:
python
from prefect import Flow, task
@task
def subtask():
raise ValueError("This task always fails")
@task
def run_deployment():
with Flow("Subflow") as subflow:
subtask()
# The error from the subtask will propagate up to this task
subflow.run()
with Flow("Main Flow") as flow:
run_deployment()
# Run the flow
state = flow.run()
# The state of the flow should be Failed, and the error message from the subtask
# should be included in the logs for the run_deployment task
print(state)
Remember, the logs are your friend. They're a bit like me, really. Always there, always helpful, and always ignored until something goes wrong. How utterly depressing.Bring your towel and join one of the fastest growing data communities. Welcome to our second-generation open source orchestration platform, a completely rethought approach to dataflow automation.
Powered by