https://prefect.io logo
Title
t

Tim-Oliver

11/22/2022, 8:40 AM
Hello, Can I get the input arguments of a task-future?
👀 1
Say I have a task which is mapped over a list:
@task()
def a_task(i):
    if np.random.rand() > 0.5:
        raise RunTimeException()
    else:
        return i + 1

@flow()
def my_flow():
    futures = a_task.map(range(10))

    # Here I want to check the future states and log for which inputs "i" it failed.
I guess I can do this:
@task()
def log_issues(i, future: PrefectFuture):
    state = future.get_state()
    if state.is_failed():
        get_run_logger().error(f"Failed on inpute {i}")

@flow()
def my_flow():
    input = range(10)
    futures = a_task.map(input)
    log_issues.map(input, future)
Would this be the way to go or am I going in a wrong direction?
Okay,
log_issues
should not be a task, because then the
future
will already be unpacked.
k

Khuyen Tran

11/22/2022, 4:53 PM
I see what you mean. I’ll see if I can find any solutions for this
Can you just return the
Failed
state with a custom message?
from prefect import task, flow
import numpy as np
from prefect.orion.schemas.states import Failed


@task()
def a_task(i):
    if np.random.rand() > 0.5:
        return Failed(message=f"Failed on input {i}")
    else:
        return i + 1


@flow()
def my_flow(input):
    return a_task.map(input)

if __name__ == "__main__":
    my_flow(range(10))
👀 1
🙏 1