Tsvetelin
07/31/2025, 9:15 AMMarvin
07/31/2025, 9:15 AMMarvin
07/31/2025, 9:15 AMpython
from prefect import task, flow
@task
def risky_task():
raise ValueError("Something went wrong!")
@flow
def my_flow():
future = risky_task.submit()
try:
result = future.result() # This will raise the original exception
except Exception as e:
# Handle the exception here
print(f"Caught exception: {e}")
# Execute your custom error handling function
handle_error(e)
2. Using .wait()
to check the state:
python
from prefect import task, flow
from prefect.states import Failed
@task
def risky_task():
raise ValueError("Something went wrong!")
@flow
def my_flow():
future = risky_task.submit()
state = future.wait() # Gets the final state without raising
if isinstance(state, Failed):
exception = state.result(raise_on_failure=False) # Get the exception without raising
# Handle the error
handle_error(exception)
The first approach is more straightforward if you just want to catch and handle the exception like regular Python code. The second approach gives you more control and access to Prefect's state system.
A few important notes:
1. .result()
will block until the task completes and raise any exception that occurred
2. .wait()
returns the final state object, which you can inspect without raising exceptions
3. If you need to access the exception without raising it, you can use state.result(raise_on_failure=False)
on a Failed state
Would you like me to explain more about any of these approaches or show how to handle specific types of exceptions?Tsvetelin
07/31/2025, 9:18 AMMarvin
07/31/2025, 9:18 AMMarvin
07/31/2025, 9:19 AMpython
from prefect import task, flow
from typing import List
from prefect.states import Failed
@task
def process_item(item: int):
if item % 2 == 0:
raise ValueError(f"Error processing item {item}")
return item * 2
@flow
def parallel_flow(items: List[int]):
# Submit all tasks
futures = [process_item.submit(item) for item in items]
# Handle results
results = []
errors = []
for i, future in enumerate(futures):
try:
result = future.result()
results.append(result)
except Exception as e:
errors.append((items[i], e))
# Process errors if needed
if errors:
print(f"Encountered {len(errors)} errors:")
for item, error in errors:
print(f"Item {item} failed with error: {error}")
return results, errors
2. Using State Objects (Non-blocking):
python
from prefect import task, flow
from prefect.states import Failed, Completed
@task
def process_item(item: int):
if item % 2 == 0:
raise ValueError(f"Error processing item {item}")
return item * 2
@flow
def parallel_flow(items: List[int]):
# Submit all tasks
futures = [process_item.submit(item) for item in items]
# Wait for all tasks to complete
states = [future.wait() for future in futures]
# Separate successful and failed results
successful = []
failed = []
for item, state in zip(items, states):
if isinstance(state, Completed):
successful.append((item, state.result()))
elif isinstance(state, Failed):
failed.append((item, state.result(raise_on_failure=False)))
return successful, failed
# Usage
items = range(5)
successful, failed = parallel_flow(items)
print("Successful:", successful)
print("Failed:", failed)
3. Continue on Failure Pattern:
python
from prefect import task, flow
from concurrent.futures import as_completed
from typing import List
@task
def process_item(item: int):
if item % 2 == 0:
raise ValueError(f"Error processing item {item}")
return item * 2
@flow
def parallel_flow(items: List[int]):
# Submit all tasks
futures = [process_item.submit(item) for item in items]
# Process results as they complete
results = []
errors = []
# Use as_completed to handle results in order of completion
for future in as_completed(futures):
try:
result = future.result()
results.append(result)
except Exception as e:
errors.append(e)
# You could trigger immediate error handling here
handle_error(e) # Your error handling function
continue # Continue processing other tasks
return results, errors
Key points about these patterns:
1. All examples allow parallel execution to continue even if some tasks fail
2. You can choose between:
- Blocking .result()
which raises exceptions
- Non-blocking .wait()
which returns state objects
3. The patterns give you different ways to:
- Collect all errors for later processing
- Handle errors as they occur
- Separate successful and failed results
Choose the pattern that best fits your needs:
- Use pattern 1 if you want simple exception handling
- Use pattern 2 if you want to work with Prefect states directly
- Use pattern 3 if you want to process results as they complete and handle errors immediately
Would you like me to elaborate on any of these patterns or show how to add specific error handling logic (like retries or custom error processing)?