Hello :raising_hand: let's say i have a mapped ta...
# prefect-community
t
Hello 🙋 let's say i have a mapped task
X
that spawn several "children" tasks
Y1
...
Yn
let's say some of them are stuck on
Running
as a finished state - but i wanna guarantee the next phase in the flow executes correctly and ignores the few "hung" tasks -- what's the (default) behavior for if i set those tasks (e.g.
Y3
,
Y17
and
Y91
) to
Skipped
? would the next task that depends on them still get executed (even if it has the default
all_successful
trigger?) the reason i'm asking about
Skipped
is because i wanna avoid a
None
response flowing downstream from these tasks
1
a
there is no solution for it when the pod dies; we discussed it already in previous issues and on GitHub. I understand your pain here but we don't have the capacity to tackle it in 1.0, we are aware and we'll tackle it in 2.0
t
in pseudo code it would be:
Copy code
results = X.map(inputs)
final_output = Z(result)
my question is not theoretical, i have an actual 24 hour flow run that’s running now and is about to finish, and i’m trying to make sure the last step executes, that’s all 🙂 i’m not looking for bug fixes or involvement of the development team here
you're saying there is no state i can put those tasks in to guarantee that the flow finishes successfully (even if it misses 3-4 of the 580 subtasks) ?
i'm talking about a manual change of state:
a
there are 2 things: 1. To determine the flow run state, you can set reference tasks https://docs.prefect.io/core/concepts/flows.html#reference-tasks 2. To manually change task run states, you can do that either through the UI or use the mutation set_task_run_states
t
i know how to change the state. the question was how would downstream tasks behave if some of the upstream tasks are
Skipped
- would they still run ?
a
it depends on triggers, Skipped is considered the same as Success
👍 1
t
@Anna Geller finally, all 580 tasks finished except 3 that failed and cannot be salvaged i set those to
Skipped
and then restarted the flow --- and then the downstream task was skipped as well and i got :
Copy code
Upstream task was skipped; if this was not the intended behavior, consider changing `skip_on_upstream_skip=False` for this task.
on the downstream task is there any other state i can put them on that would force the downstream task to run for this existing flow run ? (running the entire flow takes 24h so i don’t wanna redo it) or is there another way to force the downstream task to run?
a
not sure what's the best way, perhaps manually starting those tasks via a separate flow triggered with flow.run()?
t
@Anna Geller but it’s a downstream task, if i start a new flow or run them manually they won’t have the inputs from upstream..? basically the flow is: 1. query snowflake 2. save to CSV 3. split CSV to (580) chunks of size 100 4. map them to a processing task 5. combine all 580 outputs to a single CSV 6. upload
577 tasks of type (4) finished successfully, and 3 failed • even if i set them to skipped, (5) refuses to run - and instead gets skipped as well • if i set them to Cancelled - they are considered failed and therefore (5) refuses to run • if i set them to Successful - (5) will run, but it will fail due to an error since i’m not verifying my inputs are not
None
i looked into the GraphQL docs and there doesn’t seem to be a way to change a task’s trigger on the fly? (if i could just change it to
any successful
i guess it would be enough?
OK i think i found two very extreme workarounds (and i'm not sure the second one would work): 1. to manipulate the checkpoint'ed output of step (3) to make the input to (4) valid, then set the failed tasks to pending and rerun them 2. to fix the Docker image itself which runs the code so that it can better handle bad (basically empty) inputs i'm not sure if the 2nd solution would work cause i'm not sure if it would re-pull the Docker image from ECR or not
like so:
Copy code
import cloudpickle


with open('salvage_me', 'rb') as f:
       zoom = cloudpickle.load(f)


mylines = '<some valid input>'
zoom[577] = tuple([zoom[577][0], mylines])
zoom[578] = tuple([zoom[578][0], mylines])
zoom[579] = tuple([zoom[579][0], mylines])


with open('salvaged', 'wb') as f:
       cloudpickle.dump(zoom, f)
looks like
Prefect
didn’t like it, and my flow is now stuck on this:
hallelujah - it finished successfully … talk about
negative engineering
😞
a
Congratulations, glad you got it working
And thanks for sharing your solution, perhaps it helps someone else with the same problem
t
@Anna Geller thanks - and sorry for the long (😳) writeup but - but- some feedback for prefect, based on this experience of running a heavy flow (in case not all of these are addressed in 2.0 already): • my life would have made much simpler if the GraphQL tool allowed to modify (existing) task's triggers or other attributes (e.g. , if i could change the task's attribute to
skip_on_upstream_skip=False
. If the goal is to prevent negative engineering - why not give users more power over their existing flow run - since changing the code would necessarily obligate a new flow run which could be costly (both in time and in money or other resources) • ultimately, these 3 tasks failed due to various "weaknesses" in my own code and/or in Prefect - namely: ◦ my shell script did not create an output when the input was empty (this then led the task to fail when it tried to read the output file and provide it as a result of the task) ◦ their input was empty because i had a small bug in my code, in the upstream task which splits the CSVs ◦ even if i wanted to force the failed tasks to be marked (artificially) as
Successful
this would not have been enough because the downstream task assumed all its inputs are valid (e.g. no
None
) ◦ the downstream task did not have
skip_on_upstream_skip=False
set on it, so i could not skip the upstream tasks ◦ the trigger for the downstream task was
all successful
(which is the default) so it did not tolerate even 3 failures out of 580 • I started to write here various ideas for how Prefect could be more "fool-proof" but honestly i'm not sure i have a clear and universally good idea at the moment - most of the ideas i had basically involved users themselves taking part in some defensive engineering (which you like to call negative engineering ) -- which is exactly the kind of thing Prefect aims to eliminate. • but, there must be some ways for Prefect to be more tolerant to issues like the ones i described, and make it easier to recover from catastrophes without repeating work and wasting resources... 🤔 if users have to write 100% correct Prefect flows always ahead of time (and a single wrong attribute or trigger could make the entire thing fail after tons of time & resources have been spent running the flow) --- then this just pushes the negative engineering problem one level "upwards", from the business logic to the orchestrating-code level --- without eliminating it: instead of making sure my own code can deal with failures, i'm making sure prefect can deal with failures, since a single misconfiguration could spell disaster (in this case, i only managed to salvage it with some "hacking" - i.e. artificially manipulating checkpointed data on
S3
)
a
Thanks for the feedback, I'm sure we can tackle many of the pain points in Prefect 2.0