Koen
02/07/2025, 10:30 AMfrom prefect import flow, get_run_logger, task, allow_failure
from prefect.states import Failed, Completed
#Custom Exception class that allows to return data
class TaskFailureException(Exception):
def __init__(self, message, data):
super().__init__(message)
self.data = data
@task(task_run_name="task_that_raises: {x}")
def task_that_raises(x):
if x < 3:
# Raise a custom exception with the data you want to preserve
raise TaskFailureException(f"Value is too low: {x}", data=x)
# Return data normally for a Completed state
return x
@task()
def another_task(data):
return (f"got here and showing data from previous task: {data}")
@flow()
def demo_passing_data():
global run_logger
run_logger = get_run_logger()
task_run_future = task_that_raises.submit(x = 1) #task started
try:
result = task_run_future.result() #task fails, jump to except
print(f"Task 2 completed with data: {result}")
except TaskFailureException as e:
print(f"Task failed with exception: {e}")
# Access the data from the exception
print(f"Failed task data: {e.data}")
result = e.data
task_run_future.wait()
# Pass the result data to another_task and establish dependency
future = another_task.submit(data=result)
thistasksresult = future.result()
print(thistasksresult)
return Completed()
if __name__ == "__main__":
demo_passing_data()
Marvin
02/07/2025, 10:30 AMKoen
02/07/2025, 10:32 AMMarvin
02/07/2025, 10:32 AMtask_run_future.wait()
method might not be functioning as you expect because Prefect Futures have a synchronous interface and may not inherently wait for asynchronous tasks to complete. Here are some key points and suggestions for handling task futures properly:
1. Futures Interface: Prefect Futures in version 3.x provide a synchronous interface, and an asynchronous one is planned for future releases. This means that methods like .wait()
might not behave as expected in an asynchronous context.
2. Handling Task Completion:
- Use task_run_future.result()
to retrieve the result of a task. This call will block until the task is completed, effectively waiting for the task to finish.
- If you need to handle asynchronous tasks, ensure you are within an asynchronous context and use await
with any asynchronous methods provided by Prefect, once available.
3. Error Handling: If you are dealing with task failures, consider using try/except blocks to handle exceptions and determine the flow's behavior based on task success or failure.
For more details, you can explore the Prefect documentation on resolving task futures and the futures source code.
Here's a small adjustment to your code to ensure the task result is properly awaited:
python
try:
result = task_run_future.result() # This will block until the task completes
print(f"Task 2 completed with data: {result}")
except TaskFailureException as e:
print(f"Task failed with exception: {e}")
print(f"Failed task data: {e.data}")
result = e.data
This approach ensures that the task's result is awaited synchronously, which should resolve the warning about pending items.