Tinh
07/11/2025, 5:46 AMflow A : task A1, A2
flow B: task B1, B2
flow C: task C1, C2
flow D
now my workflow is
A1->B1->C1->D
A2->B2->C2-> D
How could I set if any of task A, B, C fails, then ignore the next dependent task (e.g. B1 fails --> skip C1), the workflow still complete with the first flow to D?Marvin
07/11/2025, 5:46 AMMarvin
07/11/2025, 5:48 AMpython
from prefect import flow, task
from typing import Optional
@task(retries=0) # Set retries=0 to fail immediately
def task_a1():
# Your A1 task logic
pass
@task(retries=0)
def task_a2():
# Your A2 task logic
pass
@task(retries=0)
def task_b1(a1_result: Optional[Any] = None):
if a1_result is None:
return None
# Your B1 task logic
pass
@task(retries=0)
def task_b2(a2_result: Optional[Any] = None):
if a2_result is None:
return None
# Your B2 task logic
pass
@task(retries=0)
def task_c1(b1_result: Optional[Any] = None):
if b1_result is None:
return None
# Your C1 task logic
pass
@task(retries=0)
def task_c2(b2_result: Optional[Any] = None):
if b2_result is None:
return None
# Your C2 task logic
pass
@task(retries=0)
def task_d(c1_result: Optional[Any] = None, c2_result: Optional[Any] = None):
# Task D will run even if one or both inputs are None
# Your D task logic
pass
@flow
def main_flow():
# Execute A tasks in parallel
a1_result = task_a1.submit()
a2_result = task_a2.submit()
# B tasks depend on A tasks
b1_result = task_b1.submit(a1_result)
b2_result = task_b2.submit(a2_result)
# C tasks depend on B tasks
c1_result = task_c1.submit(b1_result)
c2_result = task_c2.submit(b2_result)
# D depends on both C tasks
final_result = task_d.submit(c1_result, c2_result)
return final_result
Key points about this implementation:
1. Each task checks if its input is None
(which would indicate an upstream failure) and returns None
if so, effectively skipping its execution.
2. The tasks are submitted using .submit()
which allows for parallel execution.
3. If any task fails:
- Its immediate downstream task will receive None
as input
- The downstream task will skip its execution and pass None
forward
- The parallel path will continue unaffected
4. Task D will execute regardless of upstream failures, as long as at least one path completes successfully.
5. Setting retries=0
ensures tasks fail immediately instead of retrying, but you can adjust this based on your needs.
To handle task failures gracefully, each task should include appropriate error handling:
python
@task(retries=0)
def task_b1(a1_result: Optional[Any] = None):
if a1_result is None:
return None
try:
# Your B1 task logic
result = do_something(a1_result)
return result
except Exception as e:
# Log the error if needed
return None
This pattern ensures that:
- If A1 fails, B1 and C1 will be skipped
- If A2 fails, B2 and C2 will be skipped
- Task D will still execute with whatever results are available
- Parallel paths remain independent
You can run this flow using:
python
if __name__ == "__main__":
main_flow()
The flow will complete even if some tasks fail, and you'll be able to see the execution status of each task in the Prefect UI.Tinh
07/11/2025, 5:54 AMOptional[Any] = None
and what expected to be in this option
I have error NameError: name 'Any' is not defined. Did you mean: 'any'?
Marvin
07/11/2025, 5:54 AMMarvin
07/11/2025, 5:54 AMAny
from the typing
module. Let me explain the typing annotations and their purpose.
Here's the complete import section you need:
python
from prefect import flow, task
from typing import Optional, Any
The Optional[Any] = None
pattern is used for type hinting in Python and serves two purposes:
1. Optional[Any]
tells Python that the parameter can be either:
- Any type of value (Any
)
- None
2. = None
sets the default value of the parameter to None
You could write the type hints in different ways depending on your needs:
python
# Option 1: Most flexible - accepts any type or None
def task_b1(a1_result: Optional[Any] = None):
pass
# Option 2: If you expect a specific type (e.g., string)
from typing import Optional, str
def task_b1(a1_result: Optional[str] = None):
pass
# Option 3: If you don't care about type hints, you can simply use
def task_b1(a1_result=None):
pass
Type hints are optional in Python - they're used for:
- Code documentation
- IDE support (better autocomplete and error detection)
- Static type checkers like mypy
If you don't need type checking, you can safely remove the type hints and just use a1_result=None
as the parameter definition. The code will work exactly the same way.
Here's a simplified version without type hints if you prefer:
python
from prefect import flow, task
@task(retries=0)
def task_b1(a1_result=None):
if a1_result is None:
return None
# Your B1 task logic
pass
Both versions will work identically at runtime - the choice between them depends on whether you want the additional type safety and documentation that type hints provide.Tinh
07/11/2025, 6:05 AMcompleted
status (green), which is weird. Can I set this to uncompleted
status (red?) so that I can quickly see which task was incompleted?Marvin
07/11/2025, 6:05 AMMarvin
07/11/2025, 6:06 AMpython
from prefect import flow, task
from prefect.states import Failed
@task(retries=0)
def task_b1(a1_result=None):
if a1_result is None:
return None
try:
# Your B1 task logic that might fail
result = 1/0 # Example of something that will fail
return result
except Exception as e:
# This will mark the task as failed (red) in the UI
raise Exception(f"Task B1 failed: {str(e)}")
# Alternative approach using Failed state
@task(retries=0)
def task_b1_alternative(a1_result=None):
if a1_result is None:
return None
try:
# Your B1 task logic that might fail
result = 1/0 # Example of something that will fail
return result
except Exception as e:
# Create a Failed state with the error message
return Failed(message=f"Task B1 failed: {str(e)}")
And here's how to modify your flow to handle these failed states:
python
@flow
def main_flow():
try:
# Execute A tasks in parallel
a1_result = task_a1.submit()
a2_result = task_a2.submit()
# B tasks depend on A tasks
b1_result = task_b1.submit(a1_result)
b2_result = task_b2.submit(a2_result)
# C tasks depend on B tasks
c1_result = task_c1.submit(b1_result)
c2_result = task_c2.submit(b2_result)
# D depends on both C tasks
final_result = task_d.submit(c1_result, c2_result)
return final_result
except Exception as e:
print(f"Flow encountered an error: {str(e)}")
# The flow will continue but the failed task will be marked as red
Key differences from the previous version:
1. Instead of silently returning None
, we now raise Exception
when a task fails. This will:
- Mark the task as failed (red) in the UI
- Still allow the flow to continue with parallel paths
- Make it easy to spot which tasks failed
2. The alternative approach using Failed
state gives you more control over the failure message and state.
3. The flow's try-except block ensures that even if tasks fail, the flow continues but preserves the failed state of individual tasks.
You can test this with a deliberately failing task:
python
@task(retries=0)
def task_b1(a1_result=None):
if a1_result is None:
return None
# Simulate a failure
raise Exception("This is a test failure")
Now when you run the flow:
- Failed tasks will show as red in the UI
- You'll see the error message in the task's details
- Parallel paths will still complete
- Downstream tasks in the failed path will be skipped
This makes it much easier to identify which tasks failed in your workflow while maintaining the desired behavior of allowing parallel paths to complete.Tinh
07/11/2025, 6:16 AMcompleted
status (green). Do you have better solution for thisMarvin
07/11/2025, 6:16 AMMarvin
07/11/2025, 6:18 AMpython
from prefect import flow, task
from prefect.utilities.annotations import allow_failure
from typing import Optional
@task(retries=0)
def task_a1():
# Your A1 task logic
return "A1 result"
@task(retries=0)
def task_a2():
# Your A2 task logic
return "A2 result"
@task(retries=0)
def task_b1(a1_result: Optional[str] = None):
if a1_result is None:
raise Exception("Upstream task failed")
# Simulate failure
raise Exception("B1 failed!")
@task(retries=0)
def task_b2(a2_result: Optional[str] = None):
if a2_result is None:
raise Exception("Upstream task failed")
return "B2 result"
@task(retries=0)
def task_c1(b1_result: Optional[str] = None):
if b1_result is None:
raise Exception("Upstream task failed")
return "C1 result"
@task(retries=0)
def task_c2(b2_result: Optional[str] = None):
if b2_result is None:
raise Exception("Upstream task failed")
return "C2 result"
@task(retries=0)
def task_d(c1_result: Optional[str] = None, c2_result: Optional[str] = None):
# Task D will run as long as at least one input is valid
if c1_result is None and c2_result is None:
raise Exception("All upstream paths failed")
return "D completed"
@flow
def main_flow():
# Execute A tasks in parallel
a1_result = task_a1()
a2_result = task_a2()
# Use allow_failure to handle task failures while showing correct status
b1_result = allow_failure(task_b1)(a1_result)
b2_result = allow_failure(task_b2)(a2_result)
# C tasks depend on B tasks
c1_result = allow_failure(task_c1)(b1_result)
c2_result = allow_failure(task_c2)(b2_result)
# D depends on both C tasks
final_result = task_d(c1_result, c2_result)
return final_result
if __name__ == "__main__":
main_flow()
Key changes in this solution:
1. We use Prefect's allow_failure
utility to wrap task calls. This:
- Shows the correct failed status (red) in the UI when a task fails
- Allows the flow to continue execution
- Properly propagates None values to downstream tasks
2. Tasks now raise exceptions instead of returning None when they fail or when their upstream dependencies fail
3. The flow structure remains the same, but we wrap task calls with allow_failure
4. Task D can still make the final decision based on which paths succeeded or failed
When you run this flow:
- If task_b1 fails, it will show as failed (red) in the UI
- The failure won't stop the parallel path through B2
- C1 will be skipped due to B1's failure
- Task D will still execute as long as at least one path succeeds
You can test different failure scenarios by modifying any task to raise an exception:
python
@task(retries=0)
def task_b1(a1_result: Optional[str] = None):
if a1_result is None:
raise Exception("Upstream task failed")
# Simulate a failure
raise Exception("B1 failed intentionally!")
This solution provides:
- Correct visual feedback in the UI (failed tasks show as red)
- Proper failure handling and propagation
- Continued execution of parallel paths
- Clear indication of which tasks failed and why
The allow_failure
wrapper is the key to achieving both the visual feedback you want and the workflow behavior you need.Tinh
07/11/2025, 6:25 AMTypeError: 'allow_failure' object is not callable
Marvin
07/11/2025, 6:25 AMMarvin
07/11/2025, 6:26 AMallow_failure
has changed. Let me provide the correct implementation:
python
from prefect import flow, task
from prefect.utilities.annotations import allow_failure
from typing import Optional
@task(retries=0)
def task_a1():
# Your A1 task logic
return "A1 result"
@task(retries=0)
def task_a2():
# Your A2 task logic
return "A2 result"
@task(retries=0)
def task_b1(a1_result: Optional[str] = None):
if a1_result is None:
raise Exception("Upstream task failed")
# Simulate failure
raise Exception("B1 failed!")
@task(retries=0)
def task_b2(a2_result: Optional[str] = None):
if a2_result is None:
raise Exception("Upstream task failed")
return "B2 result"
@task(retries=0)
def task_c1(b1_result: Optional[str] = None):
if b1_result is None:
raise Exception("Upstream task failed")
return "C1 result"
@task(retries=0)
def task_c2(b2_result: Optional[str] = None):
if b2_result is None:
raise Exception("Upstream task failed")
return "C2 result"
@task(retries=0)
def task_d(c1_result: Optional[str] = None, c2_result: Optional[str] = None):
# Task D will run as long as at least one input is valid
if c1_result is None and c2_result is None:
raise Exception("All upstream paths failed")
return "D completed"
@flow
def main_flow():
# Execute A tasks in parallel
a1_result = task_a1()
a2_result = task_a2()
# Use @allow_failure decorator syntax
b1_result = task_b1.with_options(allow_failure=True)(a1_result)
b2_result = task_b2.with_options(allow_failure=True)(a2_result)
# C tasks depend on B tasks
c1_result = task_c1.with_options(allow_failure=True)(b1_result)
c2_result = task_c2.with_options(allow_failure=True)(b2_result)
# D depends on both C tasks
final_result = task_d(c1_result, c2_result)
return final_result
if __name__ == "__main__":
main_flow()
The key change is in how we apply the allow_failure
option. Instead of wrapping the task call, we use the .with_options(allow_failure=True)
method. This is the correct way to mark tasks as allowing failure while still showing the proper failed status in the UI.
When a task fails:
- It will show as failed (red) in the UI
- The flow will continue execution
- Downstream tasks will receive None for their failed upstream dependencies
- Parallel paths will continue executing
This should resolve the TypeError you encountered while maintaining the desired behavior of showing failed tasks properly in the UI.Tinh
07/11/2025, 6:30 AMTypeError: Task.with_options() got an unexpected keyword argument 'allow_failure'
Marvin
07/11/2025, 6:30 AMMarvin
07/11/2025, 6:32 AMpython
from prefect import flow, task
from typing import Optional
@task(retries=0)
def task_a1():
# Your A1 task logic
return "A1 result"
@task(retries=0)
def task_a2():
# Your A2 task logic
return "A2 result"
@task(retries=0)
def task_b1(a1_result: Optional[str] = None):
if a1_result is None:
return None
# Simulate failure
raise Exception("B1 failed!")
@task(retries=0)
def task_b2(a2_result: Optional[str] = None):
if a2_result is None:
return None
return "B2 result"
@task(retries=0)
def task_c1(b1_result: Optional[str] = None):
if b1_result is None:
return None
return "C1 result"
@task(retries=0)
def task_c2(b2_result: Optional[str] = None):
if b2_result is None:
return None
return "C2 result"
@task(retries=0)
def task_d(c1_result: Optional[str] = None, c2_result: Optional[str] = None):
# Task D will run as long as at least one input is valid
if c1_result is None and c2_result is None:
print("Warning: All upstream paths failed")
return "D completed"
@flow
def main_flow():
# Execute A tasks in parallel
a1_future = task_a1.submit()
a2_future = task_a2.submit()
# B tasks depend on A tasks
try:
b1_future = task_b1.submit(a1_future)
except Exception as e:
print(f"B1 failed: {e}")
b1_future = None
try:
b2_future = task_b2.submit(a2_future)
except Exception as e:
print(f"B2 failed: {e}")
b2_future = None
# C tasks depend on B tasks
try:
c1_future = task_c1.submit(b1_future)
except Exception as e:
print(f"C1 failed: {e}")
c1_future = None
try:
c2_future = task_c2.submit(b2_future)
except Exception as e:
print(f"C2 failed: {e}")
c2_future = None
# D depends on both C tasks
final_result = task_d.submit(c1_future, c2_future)
return final_result
if __name__ == "__main__":
main_flow()
Key changes in this version:
1. We use .submit()
to create futures for each task, which allows for better parallel execution and error handling
2. Each task submission is wrapped in a try-except block to handle failures gracefully
3. When a task fails:
- The exception will be raised and caught
- The task will show as failed (red) in the UI
- The flow will continue execution
- Downstream tasks will receive None for their failed upstream dependencies
4. Tasks check for None inputs to handle upstream failures gracefully
You can test the failure scenario by running the flow - task_b1 will fail (showing red in the UI) but the flow will continue through the parallel path and complete successfully.
To make the failure more visible, you could also add logging:
python
from prefect import flow, task
import logging
logging.basicConfig(level=<http://logging.INFO|logging.INFO>)
logger = logging.getLogger(__name__)
@task(retries=0)
def task_b1(a1_result: Optional[str] = None):
if a1_result is None:
return None
logger.error("Task B1 is failing intentionally")
raise Exception("B1 failed!")
This implementation will:
- Show failed tasks as failed in the UI
- Allow the flow to continue execution
- Properly handle task dependencies
- Maintain parallel execution paths
- Provide clear visibility into which tasks failedTinh
07/11/2025, 6:33 AMValueError: Expected failed or crashed state got NotReady(message="Upstream task run 'f83c83e9-0dc9-4775-b355-7ae6ca4f6b4b' did not reach a 'COMPLETED' state.", type=PENDING, result=None).
Marvin
07/11/2025, 6:33 AMMarvin
07/11/2025, 6:34 AMpython
from prefect import flow, task
from typing import Optional
import logging
logging.basicConfig(level=<http://logging.INFO|logging.INFO>)
logger = logging.getLogger(__name__)
@task(retries=0)
def task_a1():
return "A1 result"
@task(retries=0)
def task_a2():
return "A2 result"
@task(retries=0)
def task_b1(a1_result: Optional[str] = None):
# Simulate failure
raise Exception("B1 failed!")
@task(retries=0)
def task_b2(a2_result: Optional[str] = None):
return "B2 result"
@task(retries=0)
def task_c1(b1_result: Optional[str] = None):
if b1_result is None:
<http://logger.info|logger.info>("Skipping C1 due to upstream failure")
return None
return "C1 result"
@task(retries=0)
def task_c2(b2_result: Optional[str] = None):
if b2_result is None:
<http://logger.info|logger.info>("Skipping C2 due to upstream failure")
return None
return "C2 result"
@task(retries=0)
def task_d(c1_result: Optional[str] = None, c2_result: Optional[str] = None):
if c1_result is None and c2_result is None:
logger.warning("Both upstream paths failed")
return "D completed"
@flow
def main_flow():
# Execute A tasks
a1_future = task_a1.submit()
a2_future = task_a2.submit()
# Wait for A tasks to complete and handle B tasks
b1_future = None
b2_future = None
try:
# Wait for A1 result and submit B1
a1_result = a1_future.result()
b1_future = task_b1.submit(a1_result)
except Exception as e:
logger.error(f"B1 path failed: {e}")
try:
# Wait for A2 result and submit B2
a2_result = a2_future.result()
b2_future = task_b2.submit(a2_result)
except Exception as e:
logger.error(f"B2 path failed: {e}")
# Handle C tasks
c1_future = None
c2_future = None
if b1_future is not None:
try:
b1_result = b1_future.result()
c1_future = task_c1.submit(b1_result)
except Exception as e:
logger.error(f"C1 path failed: {e}")
if b2_future is not None:
try:
b2_result = b2_future.result()
c2_future = task_c2.submit(b2_result)
except Exception as e:
logger.error(f"C2 path failed: {e}")
# Resolve C results
c1_result = None
c2_result = None
if c1_future is not None:
try:
c1_result = c1_future.result()
except Exception as e:
logger.error(f"Error getting C1 result: {e}")
if c2_future is not None:
try:
c2_result = c2_future.result()
except Exception as e:
logger.error(f"Error getting C2 result: {e}")
# Execute final task
final_result = task_d.submit(c1_result, c2_result)
return final_result.result()
if __name__ == "__main__":
main_flow()
Key changes in this version:
1. We use .result()
to wait for task completion and handle failures explicitly:
python
try:
a1_result = a1_future.result()
b1_future = task_b1.submit(a1_result)
except Exception as e:
logger.error(f"B1 path failed: {e}")
2. We check for None futures before attempting to get results:
python
if b1_future is not None:
try:
b1_result = b1_future.result()
c1_future = task_c1.submit(b1_result)
3. Each task submission is properly wrapped in error handling
4. The flow maintains proper task dependencies while allowing parallel paths to continue when one fails
When you run this flow:
- Task B1 will fail and show as failed (red) in the UI
- The parallel path through B2 will continue
- Task C1 will be skipped due to B1's failure
- Task D will execute with whatever results are available
- All states will be properly reflected in the UIMarvin
07/11/2025, 6:34 AMTinh
07/11/2025, 2:26 PMMarvin
07/11/2025, 2:26 PMMarvin
07/11/2025, 2:27 PMpython
from prefect import flow, task
from prefect.utilities.annotations import allow_failure
from typing import Optional, Any
import logging
from functools import wraps
logging.basicConfig(level=<http://logging.INFO|logging.INFO>)
logger = logging.getLogger(__name__)
def make_resilient(task_func):
"""Decorator to make any task resilient to failures"""
@wraps(task_func)
@task
def wrapped(*args, **kwargs):
try:
# Check if any input is None (indicating upstream failure)
if any(arg is None for arg in args) or any(v is None for v in kwargs.values()):
logger.warning(f"Skipping {task_func.__name__} due to upstream failure")
return None
return task_func(*args, **kwargs)
except Exception as e:
logger.error(f"Task {task_func.__name__} failed: {e}")
return None
return wrapped
# Define your task logic as regular functions
def process_a1():
return "A1 result"
def process_a2():
return "A2 result"
def process_b1(a1_result):
# Simulate potential failure
if some_condition: # replace with your condition
raise Exception("B1 failed!")
return f"B1 processed {a1_result}"
def process_b2(a2_result):
return f"B2 processed {a2_result}"
def process_c1(b1_result):
return f"C1 processed {b1_result}"
def process_c2(b2_result):
return f"C2 processed {b2_result}"
def process_d(c1_result, c2_result):
results = []
if c1_result is not None:
results.append(c1_result)
if c2_result is not None:
results.append(c2_result)
return f"D processed {len(results)} successful paths"
# Convert functions to resilient tasks
task_a1 = make_resilient(process_a1)
task_a2 = make_resilient(process_a2)
task_b1 = make_resilient(process_b1)
task_b2 = make_resilient(process_b2)
task_c1 = make_resilient(process_c1)
task_c2 = make_resilient(process_c2)
task_d = make_resilient(process_d)
@flow(name="Resilient Workflow")
def main_flow():
# Track successful paths
successful_paths = 0
failed_paths = 0
# Execute A tasks
futures = {
'a1': task_a1.submit(),
'a2': task_a2.submit()
}
# Execute B tasks
try:
futures['b1'] = task_b1.submit(futures['a1'])
except Exception as e:
logger.error(f"B1 submission failed: {e}")
futures['b1'] = None
try:
futures['b2'] = task_b2.submit(futures['a2'])
except Exception as e:
logger.error(f"B2 submission failed: {e}")
futures['b2'] = None
# Execute C tasks
try:
futures['c1'] = task_c1.submit(futures['b1'])
except Exception as e:
logger.error(f"C1 submission failed: {e}")
futures['c1'] = None
try:
futures['c2'] = task_c2.submit(futures['b2'])
except Exception as e:
logger.error(f"C2 submission failed: {e}")
futures['c2'] = None
# Get results for final task
c1_result = None
c2_result = None
try:
if futures['c1'] is not None:
c1_result = futures['c1'].result()
if c1_result is not None:
successful_paths += 1
else:
failed_paths += 1
except Exception:
failed_paths += 1
try:
if futures['c2'] is not None:
c2_result = futures['c2'].result()
if c2_result is not None:
successful_paths += 1
else:
failed_paths += 1
except Exception:
failed_paths += 1
# Execute final task
final_result = task_d.submit(c1_result, c2_result)
<http://logger.info|logger.info>(f"Workflow completed with {successful_paths} successful and {failed_paths} failed paths")
return final_result.result()
if __name__ == "__main__":
main_flow()
Key features of this solution:Marvin
07/11/2025, 2:27 PMmake_resilient
decorator makes any task automatically handle failures:
- Checks for upstream failures (None inputs)
- Catches and logs any exceptions
- Returns None on failure, allowing downstream tasks to handle the failure gracefully
2. Separation of Concerns:
- Task logic is defined in regular functions
- The resilient behavior is added through the decorator
- Flow logic handles the task orchestration
3. Comprehensive Error Handling:
- Every task submission is wrapped in try-except
- Failed tasks are properly logged
- The flow tracks successful and failed paths
4. Flexible Task Dependencies:
- Tasks can handle None inputs from failed upstream tasks
- Parallel paths continue independently
- Final task can process partial results
To use this pattern with your own tasks:
1. Define your task logic as regular functions:
python
def your_task_function(input_data):
# Your task logic here
return processed_result
2. Convert it to a resilient task:
python
resilient_task = make_resilient(your_task_function)
3. Use it in your flow:
python
@flow
def your_flow():
try:
result = resilient_task.submit(input_data)
except Exception as e:
logger.error(f"Task submission failed: {e}")
result = None
This solution provides:
- Automatic failure handling for any task
- Clear visibility of failures in the UI
- Continued workflow execution despite failures
- Proper logging of all failures
- Tracking of successful vs failed paths
The workflow will complete even if any task fails, and you'll have clear visibility into which paths succeeded and which failed.