Patricio Navarro
01/22/2025, 3:21 PM<http://logger.info|logger.info>("Build fixture contestants")
fixtures_contestants_ratings_grouped: List[List[FixtureContestantsRatings]] = build_fixture_contestants.map(
grouped_fixtures,
unmapped(db_connections)).result()
<http://logger.info|logger.info>("Generate Expected Values")
contestants_evs_grouped: List[List[ContestantEv]] = generate_expected_values.map(
fixtures_contestants_ratings_grouped,
unmapped(team_models)).result()
Which changes I have to make to make if I want Prefect to keep running the following task with the results it have?Marvin
01/22/2025, 3:21 PMallow_failure
feature. This allows you to specify that certain tasks can fail without affecting the overall flow execution. Here's a brief overview of how you might implement this:
1. Use allow_failure
with Tasks:
You can wrap tasks that might fail with allow_failure
. This will ensure that even if the task fails, the flow will continue executing the subsequent tasks.
Here's a simplified example of how you can use `allow_failure`:
python
from prefect import flow, task, allow_failure
@task
def risky_task():
# Task logic that might fail
pass
@task
def next_task():
# Task logic that should run regardless
pass
@flow
def my_flow():
result = allow_failure(risky_task)() # This task can fail without stopping the flow
next_task()
my_flow()
For more details, you can refer to the Prefect documentation or explore the allow_failure
feature in the Prefect GitHub repository.
If you need more specific guidance or examples, feel free to ask!Patricio Navarro
01/22/2025, 3:27 PM@task
def risky_task(number:int):
if number==2:
raise Exception
return number
@task
def next_task(number:int):
# Task logic that should run regardless
pass
r1 = risky_task.map([1,2,3]).result()
r2 = next_task.map(r1).result()
How can I make next_task() to run despite one of one of the task fails?Marvin
01/22/2025, 3:30 PMallow_failure
in combination with handling the results appropriately. Here's how you might adapt your example:
1. Use allow_failure
: This will allow the task to fail without stopping the flow. However, you need to handle the results to ensure that the downstream task can process them correctly.
2. Filter or Handle Failed Results: You can filter out the failed results or set a default value to ensure that the downstream tasks can proceed.
Here's a modified version of your code using these concepts:
python
from prefect import flow, task, allow_failure
@task
def risky_task(number: int):
if number == 2:
raise Exception("Failure")
return number
@task
def next_task(number: int):
print(f"Processing number: {number}")
@flow
def my_flow():
# Allow risky_task to fail and capture the results
results = allow_failure(risky_task.map([1, 2, 3]))
# Filter out failed results or handle them as needed
successful_results = [result for result in results if result.is_successful()]
next_task.map(successful_results)
my_flow()
In this example, allow_failure
is used to let risky_task
fail without halting the flow. Then, you filter the results to only pass successful ones to next_task
.
For more detailed information, you can explore the Prefect documentation or check the Prefect GitHub repository.