Koen
02/11/2025, 1:53 PMMarvin
02/11/2025, 1:53 PMKoen
02/11/2025, 1:54 PMfrom prefect import flow, get_run_logger, task
from prefect.states import Failed, Completed
#Custom Exception class that allows to return data
class TaskFailureException(Exception):
def __init__(self, message, data):
super().__init__(message)
self.data = data
@task(task_run_name="task_that_raises: {x}")
def task_that_raises(x):
if x < 3:
# Raise a custom exception with the data you want to preserve
raise TaskFailureException(f"Value is too low: {x}", data=x)
# Return data normally for a Completed state
return x
@task()
def another_task(data):
return (f"got here and showing data from previous task: {data}")
@flow()
def demo_passing_data():
global run_logger
run_logger = get_run_logger()
fourth_task = task_that_raises(x=1, return_state=True)
print("hey i'm here")
if fourth_task.is_failed():
print("now I'm here")
print(f"{fourth_task.data}")
print(f"{fourth_task.data.result.data}")
fifth_task = another_task(data=10, wait_for=[fourth_task])
print(fifth_task)
return Completed()
if __name__ == "__main__":
demo_passing_data()
Marvin
02/11/2025, 1:54 PMwait_for
parameter and the state management of the fourth_task
. Here's a breakdown of what's happening:
1. Task State Management: In Prefect, tasks have states such as Pending, Running, Completed, Failed, etc. If a task fails, its state is set to Failed, and any downstream tasks that depend on it using wait_for
will not run unless the upstream task completes successfully.
2. Custom Exception Handling: In your task_that_raises
, you're raising a TaskFailureException
when x < 3
, which sets the task's state to Failed. This prevents another_task
from starting because it is waiting for fourth_task
to complete successfully.
3. Using wait_for
: The wait_for
parameter is used to specify that a task should wait for other tasks to complete before starting. If any of those tasks fail, the task with wait_for
will not start.
To resolve this, you can modify your flow to handle failed states in a way that allows downstream tasks to proceed. Here's a suggestion:
- Use a try-except block to catch the custom exception and handle it appropriately, possibly setting a default value or using a fallback task.
Here's a conceptual example:
python
from prefect import flow, get_run_logger, task
from prefect.states import Failed, Completed
class TaskFailureException(Exception):
def __init__(self, message, data):
super().__init__(message)
self.data = data
@task(task_run_name="task_that_raises: {x}")
def task_that_raises(x):
if x < 3:
raise TaskFailureException(f"Value is too low: {x}", data=x)
return x
@task()
def another_task(data):
return f"got here and showing data from previous task: {data}"
@flow()
def demo_passing_data():
global run_logger
run_logger = get_run_logger()
try:
fourth_task = task_that_raises(x=1)
except TaskFailureException as e:
run_logger.warning(f"Task failed with data: {e.data}")
# Optionally set a default or handle the failure
fourth_task_result = e.data
else:
fourth_task_result = fourth_task
fifth_task = another_task(data=fourth_task_result)
print(fifth_task)
return Completed()
if __name__ == "__main__":
demo_passing_data()
This way, you handle the failure case and allow another_task
to proceed with a fallback or default value. For more on task dependencies and state management, you might find these resources helpful:
- Task Engine
- Tasks