Christian Sicari
06/04/2024, 3:38 PMfrom random import randint
from prefect import task, flow
class DummyError(Exception):
pass
@task
def say_hello():
print("Hello, world!")
@task
def t1():
print("t1")
@task
def t2():
print("t2")
@task
def t3():
raise DummyError("t3")
@task
def mapper():
task_id = randint(1, 2)
task_instance = globals()[f"t{task_id}"] # dynamically select a task
return [
task_instance.submit(),
t3.submit()
]
@task
def reducer(futures):
for future in futures:
try:
future.result()
except Exception as e:
print(f"Error: {e}")
@flow
def myflow():
say_hello()
futures = mapper()
reducer(futures)
say_hello()
return "OK"
if __name__ == "__main__":
result = myflow()
I am trying to submit multiple dynamic tasks and then gather all the results togethe in another task called reducer. If one of the tasks fails, I want to just ignore it.
When I run this workflow, I get this error
prefect.exceptions.UnfinishedRun: Run is in PENDING state, its result is not available.
Marvin
06/04/2024, 3:38 PMUnfinishedRun
error because one of your dynamically submitted tasks is still in a PENDING
state when you try to retrieve its result. When a task fails, it should not cause your flow to crash. Instead, it looks like the error handling isn't quite right. A proper approach for dynamically handling and ignoring task failures requires ensuring that the state of each task is checked before attempting to get the result.
Here's an improved version of your flow where we handle the possible task states correctly:
python
from random import randint
from prefect import task, flow
from prefect.states import Completed, Failed
class DummyError(Exception):
pass
@task
def say_hello():
print("Hello, world!")
@task
def t1():
print("t1")
@task
def t2():
print("t2")
@task
def t3():
raise DummyError("t3")
@task
def mapper():
task_id = randint(1, 2)
task_instance = globals()[f"t{task_id}"] # dynamically select a task
return [
task_instance.submit(),
t3.submit()
]
@task
def reducer(futures):
for future in futures:
if future.get_state().is_finished():
try:
future.result()
except Exception as e:
print(f"Error: {e}")
else:
print(f"Task {future.name} did not finish and cannot provide a result.")
@flow
def myflow():
say_hello()
futures = mapper()
reducer(futures)
say_hello()
return "OK"
if __name__ == "__main__":
result = myflow()
### Explanation:
- Check if Task Completed: Before retrieving the result of a future, the code checks if the task's state is finished using future.get_state().is_finished()
.
- Handle Exceptions: If the task did finish, it attempts to get the result and catches any exceptions, printing the error.
- Report Unfinished Tasks: If the task didn't finish, it prints a message indicating so.
This approach should help you gracefully handle errors and avoid the UnfinishedRun
exception. For more details and improvements, you can follow discussions and updates here and here.
Hope this assists you.
Would you like links to the documentation?Christian Sicari
06/04/2024, 3:40 PM