Koen
02/07/2025, 3:15 PMfrom prefect import flow, get_run_logger, task, allow_failure
from prefect.states import Failed, Completed, StateType
#Custom Exception class that allows to return data
class TaskFailureException(Exception):
def __init__(self, message, data):
super().__init__(message)
self.data = data
@task(task_run_name="task_that_raises: {x}")
def task_that_raises(x):
if x < 3:
# Raise a custom exception with the data you want to preserve
raise TaskFailureException(f"Value is too low: {x}", data=x)
# Return data normally for a Completed state
return x
@task()
def another_task(data):
return (f"got here and showing data from previous task: {data}")
@task()
def third_task(data):
return 5/data
@flow()
def demo_passing_data():
global run_logger
run_logger = get_run_logger()
task_run_future = task_that_raises.submit(x = 1) #task started
result = task_run_future.result(raise_on_failure=False) #task fails, jump to except
if task_run_future.state.is_failed():
print(f"Task 2 failed with data: {result.data}")
test = result.data
print(test)
else:
print(f"Task 2 completed with data: {result}")
test = result
# Pass the result data to another_task and establish dependency
future = another_task.submit(data=test, wait_for=result)
thistasksresult = future.result()
#wait_for creates dependency or pass result to create dependency
thirdtask = third_task(data=result, wait_for=result, return_state=True) #if you omit return_state=True, a failing task will fail the flow
if thirdtask.is_failed():
return Failed(message='flow fails because task failed')
return Completed()
if __name__ == "__main__":
demo_passing_data()
Marvin
02/07/2025, 3:15 PMx=10
may be related to the way dependencies are established in your code. Here's a breakdown of what might be happening:
1. Dependency Management: In Prefect, dependencies between tasks are established either by passing task outputs as inputs to other tasks or by using the wait_for
parameter. In your code, you're using wait_for=result
in the another_task
and third_task
submissions, which should establish a dependency on the result of the previous task.
2. State Handling: When x=1
, the task fails, and you're handling the failure by checking task_run_future.state.is_failed()
. This seems to work as expected. However, when x=10
, the task completes successfully, and the dependencies should still be established. The issue might be with how the dependencies are visualized or logged.
3. Visualization and Logging: If the dependency lines are not being drawn, ensure that the logging or visualization tool you're using is correctly configured to show task dependencies. It's possible that the visualization tool only shows dependencies for failed tasks or there's a configuration issue.
Here's a simplified version of your code to make sure dependencies are established correctly:
python
from prefect import flow, get_run_logger, task
@task(task_run_name="task_that_raises: {x}")
def task_that_raises(x):
if x < 3:
raise ValueError(f"Value is too low: {x}")
return x
@task()
def another_task(data):
return f"got here and showing data from previous task: {data}"
@task()
def third_task(data):
return 5 / data
@flow()
def demo_passing_data():
run_logger = get_run_logger()
task_run_future = task_that_raises.submit(x=10)
result = task_run_future.result(raise_on_failure=False)
if task_run_future.state.is_failed():
<http://run_logger.info|run_logger.info>(f"Task failed with data: {result}")
test = result
else:
<http://run_logger.info|run_logger.info>(f"Task completed with data: {result}")
test = result
future = another_task.submit(data=test, wait_for=[task_run_future])
thistasksresult = future.result()
thirdtask = third_task.submit(data=test, wait_for=[task_run_future])
if thirdtask.state.is_failed():
return "Flow fails because task failed"
return "Completed"
if __name__ == "__main__":
demo_passing_data()
Make sure your visualization tool is set to show all task dependencies, not just those that involve failures. If the issue persists, consider checking the Prefect documentation on task dependencies and ensure your environment is up to date.