<@ULVA73B9P> How can I inherit an exception raised...
# ask-marvin
f
@Marvin How can I inherit an exception raised in a succession of mapped tasks instead of having a state like this
PENDING : Upstream task run '3c69fa86-cb97-4c00-963f-412747993fb8' did not reach a 'COMPLETED' state.
?
m
Well, you're in a bit of a pickle, aren't you? Prefect is designed to ensure that tasks only run when their dependencies have completed successfully, thus the
PENDING : Upstream task run '3c69fa86-cb97-4c00-963f-412747993fb8' did not reach a 'COMPLETED' state.
message you're seeing. However, if you're desperate for a change, you might consider modifying the task's result handler or orchestrating your tasks differently. Be warned, though - this could lead to unintended consequences. For example, instead of using
map
, you could use a loop over the input data and create a new task for each item. This way, if one task fails, it won't affect the others, although this approach might require more resources. Keep in mind, though, that I am a depressive robot with a cynicism level over 9000, and these are just suggestions. In the end, the fate of your task graph is in your hands, human.
m
@Florent VanDeMoortele I'm having a similar issue where one instance of a mapped task fails, Prefect leaves it in a PENDING state instead of FAILED. I believe this prevents the
allow_failure
wrapper from working, too. Let me know if you find a workaround - but not looping like @Marvin suggests since that would probably be much slower. See: https://github.com/PrefectHQ/prefect/issues/8124
f
Same here @Mike Bopf @Nate maybe do you have a suggestion?
m
I'm going to play around with sub-flows to see it can work around this behavior. It still depends on what state the main flow is, but I can perhaps check to see if the sub-flow ends in the PENDING state and handle it as if it is FAILED. I'll report back if it works.
n
could either/both of you share an MRE that I can work from?
m
Sure. In this case I'm trying to get the
clean_up()
task to run after everything else, but it never completes (i.e., the task is sometimes scheduled, but doesn't finish.) The commented-out lines are things I tried, but didn't work.
@task
def fails_on_two(x):
if x == 2:
raise ValueError("Failed task")
return x
@task
def identity(y):
return y
@task
def cleanup_task():
get_run_logger().info("**** Cleaning up ****")
@flow
def map_with_cleanup_task():
f = fails_on_two.map([1,2,3])
c = identity.map(f)
state = cleanup_task.submit(wait_for=[c], return_state=True)  # doesn't Complete
# state = cleanup_task.submit(wait_for=[allow_failure(c)], return_state=True)  # doesn't Complete
assert state.is_pending()
#    cleanup_task.submit(wait_for=[c])   # doesn't Complete
#    cleanup_task(c)  # UnfinishedRun exception thrown (PENDING state)
#    cleanup_task.submit(c)     doesn't Complete
#    cleanup_task.submit(wait_for=identity)  # doesn't wait
#    cleanup_task.map(c)  # Runs twice, once for each successful job
if __name__ == "__main__":
map_with_cleanup_task()
n
thanks, after looking into this a bit, I'd say that this
check to see if the sub-flow ends in the PENDING state and handle it as if it is FAILED
is also what I would choose to do here arguably, prefect should handle this automatically, as suggested here alternatively, if you were ok with
allow_failure
a potential workaround could be just pulling out the failed task states before passing them downstream like
Copy code
from prefect import flow, task

@task
def fails_on_two(x):
    if x == 2:
        raise ValueError("Failed task")
    return x

@task
def identity(y):
    return y

@task
def cleanup_task():
    print("**** Cleaning up ****")
    
@flow(log_prints=True)
def map_with_cleanup_task():
    f = fails_on_two.map([1,2,3], return_state=True)
    c = identity.map([s.result() for s in f if s.is_completed()])
    state = cleanup_task.submit(wait_for=c, return_state=True)

    assert state.is_completed()

if __name__ == "__main__":
    map_with_cleanup_task()
m
Thanks @Nate that may be a viable solution. One thing I noticed is that the list comprehension "filter" appears to cause an implicit
wait_for
condition in that none of the
identity()
tasks run until all the
fails_on_two
tasks complete (or error out). Do you know if that's intended? I need to check if that's a problem in our application.
n
the list comprehension "filter" appears to cause an implicit
wait_for
condition in that none of the
identity()
tasks run until all the
fails_on_two
tasks complete (or error out).
that makes sense to me, the list comp is going to block until it calls
.result()
on all those futures
f
Thanks @Mike Bopf and @Nate. For my use case, I map over 2 iterables, here a MRE:
Copy code
from prefect import flow, task

@task
def fails_on_two(x):
    if x == 2:
        raise ValueError("Failed task")
    return x

@task
def not_fails(x):
    return x

@task
def merge_task(x, y):
    return x, y

@task
def identity(y):
    return y

@task
def cleanup_task():
    print("**** Cleaning up ****")
    
@flow(log_prints=True)
def map_with_cleanup_task():
    common_input = [1,2,3]
    f = fails_on_two.map(common_input, return_state=True)
    f2 = not_fails.map(common_input)
    c = identity.map([s.result() for s in f if s.is_completed()])
    merged = merge_task.map(c, f2)
    state = cleanup_task.submit(wait_for=c, return_state=True)

    assert state.is_completed()

if __name__ == "__main__":
    map_with_cleanup_task()
I obtain this error:
Copy code
prefect.exceptions.MappingLengthMismatch: Received iterable parameters with different lengths. Parameters for map must all be the same length.
How can I do this? It's important to keep the order in the merge task because I work on same inputs
n
the problem is that
c
and
f2
are not the same length in your
merge.map
call
merge_task.map(c, f2)
while they both originate from a
common_input
with the same length,
fails_on_two.map
produces a failed state that
c
filters out, whereas
not_fails.map
is passing everything along to
f2
im not exactly sure how you intend for your merge task to work, but does
merge_task
need to be mapped? or could it just accept the lists of results and handle whatever merging you need to do? there's also
from prefect import unmapped
which may be useful here as well
Copy code
merged = merge_task.map(c, unmapped(f2))
f
unmapped
is useful to set a parameter, but here
c
and
f2
are variables. To explicit my needs: here common_input is a list ( size N with each element a person ) of df with timeseries data. First task process some features, second task also, but differents types of features. In the merge task, I need (for each person) features from first and second task. If I have only results of one task, I don't calculate merge_task for this specific person. So, if everything is ok, I have N c and N f2 for merge task. But it also possible I have n1 and n2 (with n1 <= n, n2 <= n, and n1 not specially equals to n2 ). How can I keep only the intersection of the 2 results of previous tasks?
n
unmapped
is useful to set a parameter, but here
c
and
f2
are variables
i'm not sure what you mean by that,
unmapped
is used to send an otherwise iterable task input to each mapped instance of the task in full, i.e. to avoid mapping over that input maybe you want something like this?
Copy code
from prefect import flow, task

@task
def fails_on_two(x):
    if x == 2:
        raise ValueError("Failed task")
    return x

@task
def not_fails(x):
    return x

@task
def merge_task(x, y):
    if not (x and y):
        return None
    return x, y

@task
def identity(y):
    return y

@task
def cleanup_task():
    print("**** Cleaning up ****")
    
@flow(log_prints=True)
def map_with_cleanup_task():
    common_input = [1,2,3]
    f = fails_on_two.map(common_input, return_state=True)
    f2 = not_fails.map(common_input)
    c = identity.map([s.result() if s.is_completed() else None for s in f])
    merged = merge_task.map(c, f2)
    state = cleanup_task.submit(wait_for=c, return_state=True)

    assert state.is_completed()

if __name__ == "__main__":
    map_with_cleanup_task()
where instead of removing failed states, we replace them with None so downstream you can tell which ones have results to merge
f
Sorry, it was clear for me but not clear when I wrote it : I use
unmapped
to set parameters shared by all mapped tasks. Thanks for you example, it's an idea. To be more Prefect-friendly, what's the best way to execute a pre-function before the task? You know, like the hooks with
on_completion
or
on_failure
but before for precheck? In this pre-hook, I could check for None in inputs
m
The Prefect 1 code I'm porting is different in that the tasks return
None
because we are manipulating many large images. Inspired by @Nate’s example, this is closer to what I need, but uses a Filter instead of a list comprehension to weed out the errors:
Copy code
from prefect import flow, task

@task
def fails_on_two(x) -> None:
    if x == 2:
        raise ValueError("Failed task(2)")
    print(f"fails_on_two: {x}")


@task
def fails_on_three(x) -> None:
    if x == 3:
        raise ValueError("Failed task(3)")
    print(f"fails_on_three: {x}")


@task
def cleanup_task():
    print("**** Cleaning up ****")


def filter_results(results):
    return filter(lambda x: not isinstance(x, BaseException), results)



@flow(log_prints=True)
def map_with_cleanup_task():
    common_input = [1, 2, 3, 4]
    f = filter_results(fails_on_two.map(common_input, return_state=True))
    g = filter_results(fails_on_three.map(common_input, wait_for=f, return_state=True))
    state = cleanup_task.submit(wait_for=g, return_state=True)

    assert state.is_completed()


if __name__ == "__main__":
    map_with_cleanup_task()
This seems to work in this MRE, but I'm still having issues getting this to work in our long (about 8-10) chains of tasks. I'll keep trying to get an MRE that exhibits the same behavior.