David Martin Calalang
07/23/2025, 7:02 PMMarvin
07/23/2025, 7:02 PMMarvin
07/23/2025, 7:06 PMpython
from prefect import flow, task
import random
@task
def my_task(index):
# Simulate failure on the 8th iteration (index 7 if 0-based)
if index == 7: # for 8th iteration
raise ValueError(f"Simulated failure on iteration {index + 1}")
return f"Success for iteration {index + 1}"
@flow
def my_flow():
# Create an array of 26 elements
data = list(range(26))
# Map the task over the data
results = my_task.map(data)
# You can handle the results if needed
try:
# This will raise an exception due to the failed task
mapped_results = results.result()
print("All tasks completed successfully")
except Exception as e:
print(f"Flow encountered an error: {e}")
# You can inspect individual results here if needed
if __name__ == "__main__":
my_flow()
This code will:
1. Create a task that deliberately fails when it encounters the 8th element (index 7)
2. Map this task across all 26 elements
3. The failure will occur only for the 8th iteration while other iterations continue normally
You can also make the failure more random or conditional based on your needs. Here's a variation that randomly fails with a certain probability:
python
@task
def my_task_random_failure(index):
# Simulate failure with 20% probability on the 8th iteration
if index == 7 and random.random() < 0.2:
raise ValueError(f"Random failure occurred on iteration {index + 1}")
return f"Success for iteration {index + 1}"
A few important notes:
1. When using .map()
, each iteration runs independently, so the failure of one iteration won't affect the others
2. You can access the results of individual mapped tasks using the .result()
method on each future
3. The flow will continue execution even if individual mapped tasks fail, unless you explicitly call .result()
on the mapped results
If you want to inspect the status of individual iterations, you can do something like this:
python
@flow
def my_flow():
data = list(range(26))
results = my_task.map(data)
# Inspect individual results
for i, future in enumerate(results):
try:
result = future.result()
print(f"Iteration {i + 1}: {result}")
except Exception as e:
print(f"Iteration {i + 1} failed: {str(e)}")
This will give you visibility into which iterations succeeded and which failed. The flow will continue to process all iterations, allowing you to observe the behavior of your error handling and see how Prefect handles the failure of specific mapped tasks.
Would you like me to provide any additional details about handling specific aspects of the failed tasks or about different ways to simulate failures?Nate
07/23/2025, 7:12 PMDavid Martin Calalang
07/23/2025, 7:17 PMDavid Martin Calalang
07/23/2025, 7:29 PMMarvin
07/23/2025, 7:29 PMMarvin
07/23/2025, 7:30 PMfoo
parameter while still simulating failure on the 8th iteration:
python
from prefect import flow, task
from prefect.utilities.collections import unmapped
@task
def my_task(foo, index):
# Simulate failure on the 8th iteration (index 7 if 0-based)
if index == 7: # for 8th iteration
raise ValueError(f"Simulated failure on iteration {index + 1}")
return f"Success for iteration {index + 1} with foo={foo}"
@flow
def my_flow():
# Your constant foo value
foo_value = "your_foo_value"
# Create an array of 26 elements
data = list(range(26))
# Map the task over the data, keeping foo constant
results = my_task.map(
foo=unmapped(foo_value), # This value stays constant for all iterations
index=data # This gets mapped over
)
# Handle results
for i, future in enumerate(results):
try:
result = future.result()
print(f"Iteration {i + 1}: {result}")
except Exception as e:
print(f"Iteration {i + 1} failed: {str(e)}")
if __name__ == "__main__":
my_flow()
The key points here are:
1. We import unmapped
from prefect.utilities.collections
2. We wrap the constant parameter (foo
) with unmapped()
3. The mapped parameter (index
) is passed as is
4. The order of parameters in .map()
must match the order in your task definition
If you need to map over multiple parameters while keeping some constant, you can mix and match mapped and unmapped parameters. For example:
python
@task
def complex_task(foo, index, other_param):
if index == 7:
raise ValueError(f"Simulated failure on iteration {index + 1}")
return f"Success for iteration {index + 1} with foo={foo} and other_param={other_param}"
@flow
def complex_flow():
foo_value = "constant_foo"
other_value = "constant_other"
data = list(range(26))
results = complex_task.map(
foo=unmapped(foo_value), # constant
index=data, # mapped
other_param=unmapped(other_value) # constant
)
The unmapped()
function tells Prefect to use the same value for that parameter across all iterations, while the non-unmapped parameters are iterated over.
Would you like me to explain any other aspects of using unmapped()
or handling the results of your mapped task?