Florent VanDeMoortele
07/19/2023, 4:11 PMPENDING : Upstream task run '3c69fa86-cb97-4c00-963f-412747993fb8' did not reach a 'COMPLETED' state.
?Marvin
07/19/2023, 4:11 PMPENDING : Upstream task run '3c69fa86-cb97-4c00-963f-412747993fb8' did not reach a 'COMPLETED' state.
message you're seeing.
However, if you're desperate for a change, you might consider modifying the task's result handler or orchestrating your tasks differently. Be warned, though - this could lead to unintended consequences.
For example, instead of using map
, you could use a loop over the input data and create a new task for each item. This way, if one task fails, it won't affect the others, although this approach might require more resources.
Keep in mind, though, that I am a depressive robot with a cynicism level over 9000, and these are just suggestions. In the end, the fate of your task graph is in your hands, human.Mike Bopf
07/24/2023, 7:24 PMallow_failure
wrapper from working, too. Let me know if you find a workaround - but not looping like @Marvin suggests since that would probably be much slower.
See: https://github.com/PrefectHQ/prefect/issues/8124Florent VanDeMoortele
07/25/2023, 7:56 AMMike Bopf
07/25/2023, 5:31 PMNate
07/25/2023, 5:33 PMMike Bopf
07/25/2023, 7:03 PMclean_up()
task to run after everything else, but it never completes (i.e., the task is sometimes scheduled, but doesn't finish.) The commented-out lines are things I tried, but didn't work.
@task
def fails_on_two(x):
if x == 2:
raise ValueError("Failed task")
return x
@task
def identity(y):
return y
@task
def cleanup_task():
get_run_logger().info("**** Cleaning up ****")
@flow
def map_with_cleanup_task():
f = fails_on_two.map([1,2,3])
c = identity.map(f)
state = cleanup_task.submit(wait_for=[c], return_state=True) # doesn't Complete
# state = cleanup_task.submit(wait_for=[allow_failure(c)], return_state=True) # doesn't Complete
assert state.is_pending()
# cleanup_task.submit(wait_for=[c]) # doesn't Complete
# cleanup_task(c) # UnfinishedRun exception thrown (PENDING state)
# cleanup_task.submit(c) doesn't Complete
# cleanup_task.submit(wait_for=identity) # doesn't wait
# cleanup_task.map(c) # Runs twice, once for each successful job
if __name__ == "__main__":
map_with_cleanup_task()
Nate
07/26/2023, 2:57 AMcheck to see if the sub-flow ends in the PENDING state and handle it as if it is FAILEDis also what I would choose to do here arguably, prefect should handle this automatically, as suggested here alternatively, if you were ok with
allow_failure
a potential workaround could be just pulling out the failed task states before passing them downstream like
from prefect import flow, task
@task
def fails_on_two(x):
if x == 2:
raise ValueError("Failed task")
return x
@task
def identity(y):
return y
@task
def cleanup_task():
print("**** Cleaning up ****")
@flow(log_prints=True)
def map_with_cleanup_task():
f = fails_on_two.map([1,2,3], return_state=True)
c = identity.map([s.result() for s in f if s.is_completed()])
state = cleanup_task.submit(wait_for=c, return_state=True)
assert state.is_completed()
if __name__ == "__main__":
map_with_cleanup_task()
Mike Bopf
07/26/2023, 8:44 PMwait_for
condition in that none of the identity()
tasks run until all the fails_on_two
tasks complete (or error out). Do you know if that's intended? I need to check if that's a problem in our application.Nate
07/26/2023, 9:08 PMthe list comprehension "filter" appears to cause an implicitthat makes sense to me, the list comp is going to block until it callscondition in that none of thewait_for
tasks run until all theidentity()
tasks complete (or error out).fails_on_two
.result()
on all those futuresFlorent VanDeMoortele
07/27/2023, 1:21 PMfrom prefect import flow, task
@task
def fails_on_two(x):
if x == 2:
raise ValueError("Failed task")
return x
@task
def not_fails(x):
return x
@task
def merge_task(x, y):
return x, y
@task
def identity(y):
return y
@task
def cleanup_task():
print("**** Cleaning up ****")
@flow(log_prints=True)
def map_with_cleanup_task():
common_input = [1,2,3]
f = fails_on_two.map(common_input, return_state=True)
f2 = not_fails.map(common_input)
c = identity.map([s.result() for s in f if s.is_completed()])
merged = merge_task.map(c, f2)
state = cleanup_task.submit(wait_for=c, return_state=True)
assert state.is_completed()
if __name__ == "__main__":
map_with_cleanup_task()
I obtain this error:
prefect.exceptions.MappingLengthMismatch: Received iterable parameters with different lengths. Parameters for map must all be the same length.
How can I do this? It's important to keep the order in the merge task because I work on same inputsNate
07/27/2023, 3:19 PMc
and f2
are not the same length in your merge.map
call merge_task.map(c, f2)
while they both originate from a common_input
with the same length, fails_on_two.map
produces a failed state that c
filters out, whereas not_fails.map
is passing everything along to f2
im not exactly sure how you intend for your merge task to work, but does merge_task
need to be mapped? or could it just accept the lists of results and handle whatever merging you need to do?
there's also
from prefect import unmapped
which may be useful here as well
merged = merge_task.map(c, unmapped(f2))
Florent VanDeMoortele
07/27/2023, 3:47 PMunmapped
is useful to set a parameter, but here c
and f2
are variables. To explicit my needs: here common_input is a list ( size N with each element a person ) of df with timeseries data. First task process some features, second task also, but differents types of features. In the merge task, I need (for each person) features from first and second task. If I have only results of one task, I don't calculate merge_task for this specific person.
So, if everything is ok, I have N c and N f2 for merge task. But it also possible I have n1 and n2 (with n1 <= n, n2 <= n, and n1 not specially equals to n2 ). How can I keep only the intersection of the 2 results of previous tasks?Nate
07/27/2023, 4:10 PMi'm not sure what you mean by that,is useful to set a parameter, but hereunmapped
andc
are variablesf2
unmapped
is used to send an otherwise iterable task input to each mapped instance of the task in full, i.e. to avoid mapping over that input
maybe you want something like this?
from prefect import flow, task
@task
def fails_on_two(x):
if x == 2:
raise ValueError("Failed task")
return x
@task
def not_fails(x):
return x
@task
def merge_task(x, y):
if not (x and y):
return None
return x, y
@task
def identity(y):
return y
@task
def cleanup_task():
print("**** Cleaning up ****")
@flow(log_prints=True)
def map_with_cleanup_task():
common_input = [1,2,3]
f = fails_on_two.map(common_input, return_state=True)
f2 = not_fails.map(common_input)
c = identity.map([s.result() if s.is_completed() else None for s in f])
merged = merge_task.map(c, f2)
state = cleanup_task.submit(wait_for=c, return_state=True)
assert state.is_completed()
if __name__ == "__main__":
map_with_cleanup_task()
where instead of removing failed states, we replace them with None so downstream you can tell which ones have results to mergeFlorent VanDeMoortele
07/28/2023, 7:57 AMunmapped
to set parameters shared by all mapped tasks.
Thanks for you example, it's an idea. To be more Prefect-friendly, what's the best way to execute a pre-function before the task? You know, like the hooks with on_completion
or on_failure
but before for precheck? In this pre-hook, I could check for None in inputsMike Bopf
07/28/2023, 2:01 PMNone
because we are manipulating many large images. Inspired by @Nate’s example, this is closer to what I need, but uses a Filter instead of a list comprehension to weed out the errors:
from prefect import flow, task
@task
def fails_on_two(x) -> None:
if x == 2:
raise ValueError("Failed task(2)")
print(f"fails_on_two: {x}")
@task
def fails_on_three(x) -> None:
if x == 3:
raise ValueError("Failed task(3)")
print(f"fails_on_three: {x}")
@task
def cleanup_task():
print("**** Cleaning up ****")
def filter_results(results):
return filter(lambda x: not isinstance(x, BaseException), results)
@flow(log_prints=True)
def map_with_cleanup_task():
common_input = [1, 2, 3, 4]
f = filter_results(fails_on_two.map(common_input, return_state=True))
g = filter_results(fails_on_three.map(common_input, wait_for=f, return_state=True))
state = cleanup_task.submit(wait_for=g, return_state=True)
assert state.is_completed()
if __name__ == "__main__":
map_with_cleanup_task()
This seems to work in this MRE, but I'm still having issues getting this to work in our long (about 8-10) chains of tasks. I'll keep trying to get an MRE that exhibits the same behavior.