Hello,
Can I get the input arguments of a task-future?
👀 1
Tim-Oliver
11/22/2022, 8:41 AM
Say I have a task which is mapped over a list:
Copy code
@task()
def a_task(i):
if np.random.rand() > 0.5:
raise RunTimeException()
else:
return i + 1
@flow()
def my_flow():
futures = a_task.map(range(10))
# Here I want to check the future states and log for which inputs "i" it failed.
Tim-Oliver
11/22/2022, 8:51 AM
I guess I can do this:
Copy code
@task()
def log_issues(i, future: PrefectFuture):
state = future.get_state()
if state.is_failed():
get_run_logger().error(f"Failed on inpute {i}")
@flow()
def my_flow():
input = range(10)
futures = a_task.map(input)
log_issues.map(input, future)
Would this be the way to go or am I going in a wrong direction?
Tim-Oliver
11/22/2022, 9:21 AM
Okay,
log_issues
should not be a task, because then the
future
will already be unpacked.
k
Khuyen Tran
11/22/2022, 4:53 PM
I see what you mean. I’ll see if I can find any solutions for this
Khuyen Tran
11/22/2022, 5:31 PM
Can you just return the
Failed
state with a custom message?
Copy code
from prefect import task, flow
import numpy as np
from prefect.orion.schemas.states import Failed
@task()
def a_task(i):
if np.random.rand() > 0.5:
return Failed(message=f"Failed on input {i}")
else:
return i + 1
@flow()
def my_flow(input):
return a_task.map(input)
if __name__ == "__main__":
my_flow(range(10))
Bring your towel and join one of the fastest growing data communities. Welcome to our second-generation open source orchestration platform, a completely rethought approach to dataflow automation.