eres
07/24/2025, 12:11 PMMarvin
07/24/2025, 12:11 PMMarvin
07/24/2025, 12:13 PMAbort
exception from prefect.exceptions
to immediately stop a flow run. Here's how to use it:
python
from prefect.exceptions import Abort
@flow
def my_flow():
# Some code...
if some_condition:
raise Abort("Stopping the flow run immediately")
# Rest of the flow...
When raised, this will immediately stop the flow run and mark it as "Aborted". The message you provide will be recorded in the flow run logs.
This is different from CancelledRun
(which is raised when retrieving results from an already cancelled run) or TerminationSignal
(which is used internally for signal-based termination). The Abort
exception is specifically designed for programmatically stopping a flow run from within the flow code itself.
Note that this is a controlled way to stop execution - it will properly handle the flow run state and logging, rather than just crashing the flow. If you need to pause rather than stop completely, you might want to look at the Pause
exception instead.eres
07/27/2025, 6:57 AMfrom prefect import flow, task
from prefect import flow, get_run_logger
import time
from prefect.exceptions import TerminationSignal, Abort
@task
def my_task():
return "my string"
@task
def my_task1(y):
raise Exception
print(y)
return y
@task
def my_task2(y):
print(y)
return y
@flow
def my_flow_abort():
logger = get_run_logger()
<http://logger.info|logger.info>("This is [bold red]fancy[/]")
try:
future = my_task.submit()
result = future.result(raise_on_failure=True)
except Exception as e:
print("handle execption in my_task")
raise e
try:
future = my_task1.submit(result)
result1 = future.result(raise_on_failure=True)
except Exception as e:
print("handle execption in my_task1")
Abort()
try:
future = my_task2.submit(result)
result13 = future.result(raise_on_failure=True)
except Exception as e:
print("handle execption in my_task2")
raise e
try:
future = my_task2.submit(result13)
result1 = future.result(raise_on_failure=True)
except Exception as e:
print("handle execption in my_task2")
raise e
if __name__ == "__main__":
my_flow_abort()
Marvin
07/27/2025, 6:57 AMMarvin
07/27/2025, 6:58 AMAbort()
instead of raising it. Here's the corrected version of your code:
python
from prefect import flow, task, get_run_logger
import time
from prefect.exceptions import TerminationSignal, Abort
@task
def my_task():
return "my string"
@task
def my_task1(y):
raise Exception
print(y)
return y
@task
def my_task2(y):
print(y)
return y
@flow
def my_flow_abort():
logger = get_run_logger()
<http://logger.info|logger.info>("This is [bold red]fancy[/]")
try:
future = my_task.submit()
result = future.result(raise_on_failure=True)
except Exception as e:
print("handle execption in my_task")
raise e
try:
future = my_task1.submit(result)
result1 = future.result(raise_on_failure=True)
except Exception as e:
print("handle execption in my_task1")
raise Abort() # Changed this line - need to raise the exception
try:
future = my_task2.submit(result)
result13 = future.result(raise_on_failure=True)
except Exception as e:
print("handle execption in my_task2")
raise e
try:
future = my_task2.submit(result13)
result1 = future.result(raise_on_failure=True)
except Exception as e:
print("handle execption in my_task2")
raise e
if __name__ == "__main__":
my_flow_abort()
The key change is in the exception handling for `my_task1`:
python
except Exception as e:
print("handle execption in my_task1")
raise Abort() # Changed from Abort() to raise Abort()
The reason your original code wasn't working is that you were creating an Abort
exception object but not actually raising it. In Python, you need to use the raise
keyword to trigger an exception. When properly raised, the Abort
exception will cause the flow to terminate immediately.
After this change, when my_task1
fails:
1. The exception will be caught in the try/except
block
2. "handle execption in my_task1" will be printed
3. The Abort
exception will be raised
4. The flow will terminate immediately, and the subsequent tasks (my_task2
) won't be executed
This should now properly abort your flow when an exception occurs in my_task1
.eres
07/27/2025, 7:13 AMMarvin
07/27/2025, 7:13 AMMarvin
07/27/2025, 7:15 AMpython
from prefect import flow, task, get_run_logger
from prefect.exceptions import TerminationSignal, Abort
from prefect.states import Aborted, Failed
@task
def my_task():
return "my string"
@task
def my_task1(y):
raise Exception
print(y)
return y
@task
def my_task2(y):
print(y)
return y
@flow
def my_flow_abort():
logger = get_run_logger()
<http://logger.info|logger.info>("This is [bold red]fancy[/]")
try:
future = my_task.submit()
result = future.result(raise_on_failure=True)
except Exception as e:
print("handle execption in my_task")
raise e
try:
future = my_task1.submit(result)
result1 = future.result(raise_on_failure=True)
except Exception as e:
print("handle execption in my_task1")
# Create an Aborted state with details
aborted_state = Aborted(
message="Flow manually aborted due to exception in my_task1",
data={"error": str(e)}
)
raise Abort(
message="Flow execution aborted due to task failure",
state=aborted_state
)
try:
future = my_task2.submit(result)
result13 = future.result(raise_on_failure=True)
except Exception as e:
print("handle execption in my_task2")
raise e
try:
future = my_task2.submit(result13)
result1 = future.result(raise_on_failure=True)
except Exception as e:
print("handle execption in my_task2")
raise e
if __name__ == "__main__":
try:
my_flow_abort()
except Abort as e:
print(f"Flow aborted: {e}")
The key changes in this version are:
1. We're using Aborted
state instead of Failed
state to be more explicit about the intention
2. We're including both a message and data in the state
3. We're properly constructing the Abort
exception with both a message and a state
If you're still seeing the flow stuck in a running state, there might be a few other things to try:
1. Make sure you're running the latest version of Prefect
2. You could try setting an explicit timeout:
python
from prefect import settings
with settings.PREFECT_API_REQUEST_TIMEOUT.set(30):
my_flow_abort()
3. If the issue persists, you could try forcing a state change using the API directly:
python
from prefect import flow, task, get_run_logger
from prefect.exceptions import TerminationSignal, Abort
from prefect.states import Aborted
from prefect.client import get_client
@flow
def my_flow_abort():
try:
# ... your existing flow code ...
except Exception as e:
print("handle execption in my_task1")
# Force state change through API
async def set_aborted_state():
async with get_client() as client:
await client.set_flow_run_state(
flow_run_id=flow_run.id,
state=Aborted(message="Flow manually aborted")
)
from prefect.utilities.asyncutils import sync_compatible
sync_compatible(set_aborted_state)()
raise Abort("Flow execution aborted")
Could you try this updated version and let me know if you're still seeing the flow stuck in a running state? Also, it would be helpful to know:
1. Which version of Prefect you're running
2. Whether you're running this locally or in a deployment
3. Whether you see any specific error messages in the logseres
07/27/2025, 7:22 AMMarvin
07/27/2025, 7:22 AMMarvin
07/27/2025, 7:23 AMpython
from prefect import flow, task, get_run_logger
from prefect.exceptions import TerminationSignal, Abort
from prefect.states import Failed, Cancelled
@task
def my_task():
return "my string"
@task
def my_task1(y):
raise Exception
print(y)
return y
@task
def my_task2(y):
print(y)
return y
@flow
def my_flow_abort():
logger = get_run_logger()
<http://logger.info|logger.info>("This is [bold red]fancy[/]")
try:
future = my_task.submit()
result = future.result(raise_on_failure=True)
except Exception as e:
print("handle execption in my_task")
raise e
try:
future = my_task1.submit(result)
result1 = future.result(raise_on_failure=True)
except Exception as e:
print("handle execption in my_task1")
# Create a Failed state with details
failed_state = Failed(
message="Flow manually aborted due to exception in my_task1",
)
raise Abort(
message="Flow execution aborted due to task failure",
state=failed_state
)
try:
future = my_task2.submit(result)
result13 = future.result(raise_on_failure=True)
except Exception as e:
print("handle execption in my_task2")
raise e
try:
future = my_task2.submit(result13)
result1 = future.result(raise_on_failure=True)
except Exception as e:
print("handle execption in my_task2")
raise e
if __name__ == "__main__":
try:
my_flow_abort()
except Abort as e:
print(f"Flow aborted: {e}")
The key changes in this version:
1. Removed the non-existent Aborted
state and replaced it with Failed
2. Simplified the state creation as Prefect 3.x has a more streamlined state system
3. Still maintaining the explicit state handling with the Abort
exception
If you're still seeing issues with the flow state, you could also try using Cancelled
state instead:
python
try:
future = my_task1.submit(result)
result1 = future.result(raise_on_failure=True)
except Exception as e:
print("handle execption in my_task1")
cancelled_state = Cancelled(
message="Flow cancelled due to exception in my_task1",
)
raise Abort(
message="Flow execution cancelled due to task failure",
state=cancelled_state
)
Could you try this updated version and let me know if it properly handles the flow state now? Also, it would be helpful to know which version of Prefect you're running (prefect version
) to ensure we're providing the most appropriate solution for your environment.eres
07/27/2025, 7:34 AMMarvin
07/27/2025, 7:34 AMMarvin
07/27/2025, 7:35 AMAbort()
or TerminationSignal
doesn't automatically change the flow's state
2. The flow state needs to be explicitly managed during termination
3. There's a specific pattern needed to ensure proper state transitions
Here's the correct way to handle flow abortion in Prefect 3.x:
python
from prefect import flow, task, get_run_logger
from prefect.exceptions import TerminationSignal
from prefect.states import Failed
@task
def my_task():
return "my string"
@task
def my_task1(y):
raise Exception
print(y)
return y
@task
def my_task2(y):
print(y)
return y
@flow
def my_flow_abort():
logger = get_run_logger()
<http://logger.info|logger.info>("This is [bold red]fancy[/]")
try:
future = my_task.submit()
result = future.result(raise_on_failure=True)
except Exception as e:
print("handle execption in my_task")
raise e
try:
future = my_task1.submit(result)
result1 = future.result(raise_on_failure=True)
except Exception as e:
print("handle execption in my_task1")
# First log the error
logger.error(f"Task failed with error: {str(e)}")
# Create a Failed state with detailed information
failed_state = Failed(
message="Flow failed due to task1 error",
data={"error": str(e)},
)
# Raise a TerminationSignal with the failed state
raise TerminationSignal(
message="Flow terminated due to task failure",
state=failed_state
)
# ... rest of your code ...
if __name__ == "__main__":
try:
my_flow_abort()
except (TerminationSignal, Exception) as e:
print(f"Flow terminated: {e}")
Key changes and explanations:
1. We're using TerminationSignal
instead of Abort()
as it's more explicit about the intention
2. We're creating a proper Failed
state with detailed information
3. We're explicitly setting the state when raising the termination signal
4. We're handling both TerminationSignal
and regular exceptions in the main block
The reason this works better is that:
- The Failed
state is a terminal state that properly indicates the flow has stopped
- The TerminationSignal
with an explicit state ensures proper state transition
- The error information is properly captured in the state's data
If you're running this as part of a deployment, you might also want to add flow-level error handling:
python
@flow(
name="my_flow_abort",
on_failure=[lambda flow, flow_run, state: print(f"Flow failed with state: {state}")],
)
def my_flow_abort():
# ... rest of your code ...
This pattern ensures that:
1. The flow won't get stuck in a running state
2. The failure is properly recorded
3. The state transition is handled correctly
4. Error information is preserved for debugging
Let me know if you try this version and still experience any issues with the flow state.