Demid
07/08/2025, 8:54 AMMarvin
07/08/2025, 8:54 AMMarvin
07/08/2025, 8:55 AMpython
from prefect import flow, task
@task
async def risky_task():
# Your async code here
result = await some_async_operation()
return result
@flow
async def main_flow():
# Await the task directly for normal execution
result = await risky_task()
2. Using allow_failure
python
from prefect import flow, task
from prefect.tasks import allow_failure
@flow
async def resilient_flow():
# This task will not fail the flow if it errors
result = await allow_failure(risky_task())
if result.status == "failed":
print(f"Task failed with error: {result.error}")
else:
print(f"Task succeeded with result: {result.value}")
3. Combining with Retries
python
from prefect import flow, task
@task(retries=3, retry_delay_seconds=60)
async def retry_task():
# This task will retry up to 3 times with 1-minute delays
result = await some_flaky_operation()
return result
@flow
async def robust_flow():
# Allow failure but still retry
result = await allow_failure(retry_task())
Key points to remember:
1. Always await async tasks: Never forget to await
async tasks, or you'll get unexpected behavior.
2. Use allow_failure strategically:
- Use it for non-critical tasks that shouldn't stop the entire flow
- Check the result status to handle failures appropriately
- You can still access the error information through the result object
3. Handling Results:
python
@flow
async def handling_flow():
result = await allow_failure(risky_task())
match result.status:
case "completed":
return result.value
case "failed":
# Log the error but continue
print(f"Task failed: {result.error}")
return default_value
4. Multiple Tasks:
python
@flow
async def parallel_flow():
# Run multiple tasks with failure allowance
results = await prefect.gather([
allow_failure(task1()),
allow_failure(task2()),
task3() # This one will still fail the flow if it errors
])
Remember that allow_failure
doesn't prevent the task from failing - it just prevents that failure from propagating to the flow level. You should still monitor these failures through Prefect's UI or logging system to maintain visibility into your pipeline's health.
For production systems, it's recommended to:
1. Set up proper monitoring and alerting for failed tasks
2. Use retries for transient failures
3. Implement appropriate fallback logic for permanent failures
4. Document which tasks are allowed to fail and why
This approach gives you both resilience and visibility into your async task execution while maintaining control over how failures affect your overall flow.Demid
07/08/2025, 8:59 AMresult = await allow_failure(freshness_check
.with_options(task_run_name=asset_key)
.submit(asset_key=asset_key))
Marvin
07/08/2025, 8:59 AMMarvin
07/08/2025, 9:01 AMpython
result = await allow_failure(
freshness_check
.with_options(task_run_name=asset_key)
.submit(asset_key=asset_key)
)
Your syntax is almost correct, but there are a few things to note:
1. The basic structure is correct - using allow_failure
with with_options
and submit
2. The indentation should be consistent for better readability
3. Make sure allow_failure
is imported correctly:
python
from prefect import allow_failure
This pattern is useful when you want to:
1. Give the task run a specific name (using with_options(task_run_name=...)
)
2. Submit it for asynchronous execution (.submit()
)
3. Allow it to fail without failing the entire flow (allow_failure
)
If you want to check the result later, remember you can do:
python
if result.status == "completed":
value = result.value
elif result.status == "failed":
error = result.error
Demid
07/08/2025, 9:20 AMMarvin
07/08/2025, 9:20 AMMarvin
07/08/2025, 9:21 AMpython
from prefect import flow, task, allow_failure
from typing import Optional
class FreshnessCheckError(Exception):
"""Custom exception for freshness check failures"""
pass
@task
async def freshness_check(asset_key: str, threshold_hours: int = 24) -> bool:
# Simulate some freshness check logic
if asset_key == "stale_data":
raise FreshnessCheckError(f"Data for {asset_key} is older than {threshold_hours} hours")
return True
@flow
async def data_quality_flow():
# Example 1: Basic allow_failure with raised exception
result1 = await allow_failure(
freshness_check
.with_options(task_run_name="stale_check_1")
.submit(asset_key="stale_data")
)
if result1.status == "failed":
print(f"Check 1 failed as expected: {result1.error}") # Will show FreshnessCheckError
# Example 2: Handling multiple assets with different outcomes
assets = ["good_data", "stale_data", "another_good_data"]
results = []
for asset in assets:
result = await allow_failure(
freshness_check
.with_options(task_run_name=f"check_{asset}")
.submit(asset_key=asset)
)
results.append((asset, result))
# Process results and take action based on failures
failed_assets = []
for asset, result in results:
if result.status == "failed":
failed_assets.append(asset)
print(f"Asset {asset} failed freshness check: {result.error}")
else:
print(f"Asset {asset} passed freshness check")
# You can still raise an exception after collecting results if needed
if len(failed_assets) > 2: # Business logic threshold
raise RuntimeError(f"Too many failed freshness checks: {failed_assets}")
return failed_assets
# Run the flow
if __name__ == "__main__":
import asyncio
failed = asyncio.run(data_quality_flow())
print(f"Failed assets: {failed}")
This example demonstrates:
1. A custom exception (FreshnessCheckError
) for specific error cases
2. Using allow_failure
with with_options
and submit
3. Checking result status and accessing error information
4. Collecting multiple results and making decisions based on them
5. Still being able to raise exceptions at the flow level based on accumulated results
The output would look something like:
Check 1 failed as expected: FreshnessCheckError("Data for stale_data is older than 24 hours")
Asset good_data passed freshness check
Asset stale_data failed freshness check: FreshnessCheckError("Data for stale_data is older than 24 hours")
Asset another_good_data passed freshness check
Failed assets: ['stale_data']
Key points:
- The original exception is preserved in result.error
- You can still access the error type and message
- The flow continues executing despite task failures
- You can make flow-level decisions based on accumulated task failures
- Task names are clearly visible in the UI thanks to with_options(task_run_name=...)
This pattern is particularly useful when you want to:
1. Allow certain tasks to fail without stopping the flow
2. Collect information about all failures before making a decision
3. Raise flow-level exceptions based on aggregate task results
4. Keep clear tracking of which specific tasks failed in the Prefect UI