<@ULVA73B9P> I have a flow that creates several ta...
# ask-marvin
p
@Marvin I have a flow that creates several task and at some point the result is used to create others.
Copy code
<http://logger.info|logger.info>("Build fixture contestants")
    fixtures_contestants_ratings_grouped: List[List[FixtureContestantsRatings]] = build_fixture_contestants.map(
        grouped_fixtures,
        unmapped(db_connections)).result()

    <http://logger.info|logger.info>("Generate Expected Values")
    contestants_evs_grouped: List[List[ContestantEv]] = generate_expected_values.map(
        fixtures_contestants_ratings_grouped,
        unmapped(team_models)).result()
Which changes I have to make to make if I want Prefect to keep running the following task with the results it have?
m
To have Prefect continue running subsequent tasks even if some tasks fail, you can use the
allow_failure
feature. This allows you to specify that certain tasks can fail without affecting the overall flow execution. Here's a brief overview of how you might implement this: 1. Use
allow_failure
with Tasks:
You can wrap tasks that might fail with
allow_failure
. This will ensure that even if the task fails, the flow will continue executing the subsequent tasks. Here's a simplified example of how you can use `allow_failure`:
Copy code
python
from prefect import flow, task, allow_failure

@task
def risky_task():
    # Task logic that might fail
    pass

@task
def next_task():
    # Task logic that should run regardless
    pass

@flow
def my_flow():
    result = allow_failure(risky_task)()  # This task can fail without stopping the flow
    next_task()

my_flow()
For more details, you can refer to the Prefect documentation or explore the
allow_failure
feature in the Prefect GitHub repository. If you need more specific guidance or examples, feel free to ask!
p
@Marvin what if the next task uses the results of the previous one?
Copy code
@task
def risky_task(number:int):
    if number==2:
        raise Exception
    return number

@task
def next_task(number:int):
    # Task logic that should run regardless
    pass




r1 = risky_task.map([1,2,3]).result()
r2 = next_task.map(r1).result()
How can I make next_task() to run despite one of one of the task fails?
m
To handle a scenario where a task might fail, but you still want to proceed with subsequent tasks that depend on its results, you can use
allow_failure
in combination with handling the results appropriately. Here's how you might adapt your example: 1. Use
allow_failure
: This will allow the task to fail without stopping the flow. However, you need to handle the results to ensure that the downstream task can process them correctly. 2. Filter or Handle Failed Results: You can filter out the failed results or set a default value to ensure that the downstream tasks can proceed. Here's a modified version of your code using these concepts:
Copy code
python
from prefect import flow, task, allow_failure

@task
def risky_task(number: int):
    if number == 2:
        raise Exception("Failure")
    return number

@task
def next_task(number: int):
    print(f"Processing number: {number}")

@flow
def my_flow():
    # Allow risky_task to fail and capture the results
    results = allow_failure(risky_task.map([1, 2, 3]))
    
    # Filter out failed results or handle them as needed
    successful_results = [result for result in results if result.is_successful()]
    next_task.map(successful_results)

my_flow()
In this example,
allow_failure
is used to let
risky_task
fail without halting the flow. Then, you filter the results to only pass successful ones to
next_task
. For more detailed information, you can explore the Prefect documentation or check the Prefect GitHub repository.