Tim-Oliver
11/22/2022, 8:40 AM@task()
def a_task(i):
if np.random.rand() > 0.5:
raise RunTimeException()
else:
return i + 1
@flow()
def my_flow():
futures = a_task.map(range(10))
# Here I want to check the future states and log for which inputs "i" it failed.
@task()
def log_issues(i, future: PrefectFuture):
state = future.get_state()
if state.is_failed():
get_run_logger().error(f"Failed on inpute {i}")
@flow()
def my_flow():
input = range(10)
futures = a_task.map(input)
log_issues.map(input, future)
Would this be the way to go or am I going in a wrong direction?log_issues
should not be a task, because then the future
will already be unpacked.Khuyen Tran
11/22/2022, 4:53 PMFailed
state with a custom message?
from prefect import task, flow
import numpy as np
from prefect.orion.schemas.states import Failed
@task()
def a_task(i):
if np.random.rand() > 0.5:
return Failed(message=f"Failed on input {i}")
else:
return i + 1
@flow()
def my_flow(input):
return a_task.map(input)
if __name__ == "__main__":
my_flow(range(10))