<@ULVA73B9P> I have a Prefect flow that calls the ...
# ask-marvin
d
@Marvin I have a Prefect flow that calls the same task 26 times (uses .map() to iterate over an array of elements). I want to simulate and observe what happens if for example, on the 8th iteration, the task fails/is incomplete. How would I simulate this?
m
thought for 268.2 seconds
Based on the research, I can help you simulate a failure on the 8th iteration of your mapped task. Here's how you can do it:
Copy code
python
from prefect import flow, task
import random

@task
def my_task(index):
    # Simulate failure on the 8th iteration (index 7 if 0-based)
    if index == 7:  # for 8th iteration
        raise ValueError(f"Simulated failure on iteration {index + 1}")
    return f"Success for iteration {index + 1}"

@flow
def my_flow():
    # Create an array of 26 elements
    data = list(range(26))
    
    # Map the task over the data
    results = my_task.map(data)
    
    # You can handle the results if needed
    try:
        # This will raise an exception due to the failed task
        mapped_results = results.result()
        print("All tasks completed successfully")
    except Exception as e:
        print(f"Flow encountered an error: {e}")
        # You can inspect individual results here if needed

if __name__ == "__main__":
    my_flow()
This code will: 1. Create a task that deliberately fails when it encounters the 8th element (index 7) 2. Map this task across all 26 elements 3. The failure will occur only for the 8th iteration while other iterations continue normally You can also make the failure more random or conditional based on your needs. Here's a variation that randomly fails with a certain probability:
Copy code
python
@task
def my_task_random_failure(index):
    # Simulate failure with 20% probability on the 8th iteration
    if index == 7 and random.random() < 0.2:
        raise ValueError(f"Random failure occurred on iteration {index + 1}")
    return f"Success for iteration {index + 1}"
A few important notes: 1. When using
.map()
, each iteration runs independently, so the failure of one iteration won't affect the others 2. You can access the results of individual mapped tasks using the
.result()
method on each future 3. The flow will continue execution even if individual mapped tasks fail, unless you explicitly call
.result()
on the mapped results If you want to inspect the status of individual iterations, you can do something like this:
Copy code
python
@flow
def my_flow():
    data = list(range(26))
    results = my_task.map(data)
    
    # Inspect individual results
    for i, future in enumerate(results):
        try:
            result = future.result()
            print(f"Iteration {i + 1}: {result}")
        except Exception as e:
            print(f"Iteration {i + 1} failed: {str(e)}")
This will give you visibility into which iterations succeeded and which failed. The flow will continue to process all iterations, allowing you to observe the behavior of your error handling and see how Prefect handles the failure of specific mapped tasks. Would you like me to provide any additional details about handling specific aspects of the failed tasks or about different ways to simulate failures?
n
i think i wrote a section on this, maybe i need to reindex the docs - one sec
🙌 1
d
thanks!
@Marvin I want to use your index method to produce an error on the 8th iteration. However, my task requires another parameter 'foo', so the parameters of my task are (foo, index). How would I use the .map() function in this case?
m
thought for 47.5 seconds
Here's how you can modify the previous example to include your
foo
parameter while still simulating failure on the 8th iteration:
Copy code
python
from prefect import flow, task
from prefect.utilities.collections import unmapped

@task
def my_task(foo, index):
    # Simulate failure on the 8th iteration (index 7 if 0-based)
    if index == 7:  # for 8th iteration
        raise ValueError(f"Simulated failure on iteration {index + 1}")
    return f"Success for iteration {index + 1} with foo={foo}"

@flow
def my_flow():
    # Your constant foo value
    foo_value = "your_foo_value"
    
    # Create an array of 26 elements
    data = list(range(26))
    
    # Map the task over the data, keeping foo constant
    results = my_task.map(
        foo=unmapped(foo_value),  # This value stays constant for all iterations
        index=data                # This gets mapped over
    )
    
    # Handle results
    for i, future in enumerate(results):
        try:
            result = future.result()
            print(f"Iteration {i + 1}: {result}")
        except Exception as e:
            print(f"Iteration {i + 1} failed: {str(e)}")

if __name__ == "__main__":
    my_flow()
The key points here are: 1. We import
unmapped
from
prefect.utilities.collections
2. We wrap the constant parameter (
foo
) with
unmapped()
3. The mapped parameter (
index
) is passed as is 4. The order of parameters in
.map()
must match the order in your task definition If you need to map over multiple parameters while keeping some constant, you can mix and match mapped and unmapped parameters. For example:
Copy code
python
@task
def complex_task(foo, index, other_param):
    if index == 7:
        raise ValueError(f"Simulated failure on iteration {index + 1}")
    return f"Success for iteration {index + 1} with foo={foo} and other_param={other_param}"

@flow
def complex_flow():
    foo_value = "constant_foo"
    other_value = "constant_other"
    data = list(range(26))
    
    results = complex_task.map(
        foo=unmapped(foo_value),      # constant
        index=data,                   # mapped
        other_param=unmapped(other_value)  # constant
    )
The
unmapped()
function tells Prefect to use the same value for that parameter across all iterations, while the non-unmapped parameters are iterated over. Would you like me to explain any other aspects of using
unmapped()
or handling the results of your mapped task?