Thread
#prefect-community
    Tom Klein

    Tom Klein

    2 months ago
    Hello 🙋 let's say i have a mapped task
    X
    that spawn several "children" tasks
    Y1
    ...
    Yn
    let's say some of them are stuck on
    Running
    as a finished state - but i wanna guarantee the next phase in the flow executes correctly and ignores the few "hung" tasks -- what's the (default) behavior for if i set those tasks (e.g.
    Y3
    ,
    Y17
    and
    Y91
    ) to
    Skipped
    ? would the next task that depends on them still get executed (even if it has the default
    all_successful
    trigger?) the reason i'm asking about
    Skipped
    is because i wanna avoid a
    None
    response flowing downstream from these tasks
    Anna Geller

    Anna Geller

    2 months ago
    there is no solution for it when the pod dies; we discussed it already in previous issues and on GitHub. I understand your pain here but we don't have the capacity to tackle it in 1.0, we are aware and we'll tackle it in 2.0
    Tom Klein

    Tom Klein

    2 months ago
    in pseudo code it would be:
    results = X.map(inputs)
    final_output = Z(result)
    my question is not theoretical, i have an actual 24 hour flow run that’s running now and is about to finish, and i’m trying to make sure the last step executes, that’s all 🙂 i’m not looking for bug fixes or involvement of the development team here
    you're saying there is no state i can put those tasks in to guarantee that the flow finishes successfully (even if it misses 3-4 of the 580 subtasks) ?
    i'm talking about a manual change of state:
    Anna Geller

    Anna Geller

    2 months ago
    there are 2 things:1. To determine the flow run state, you can set reference tasks https://docs.prefect.io/core/concepts/flows.html#reference-tasks 2. To manually change task run states, you can do that either through the UI or use the mutation set_task_run_states
    Tom Klein

    Tom Klein

    2 months ago
    i know how to change the state. the question was how would downstream tasks behave if some of the upstream tasks are
    Skipped
    - would they still run ?
    Anna Geller

    Anna Geller

    2 months ago
    it depends on triggers, Skipped is considered the same as Success
    Tom Klein

    Tom Klein

    2 months ago
    @Anna Geller finally, all 580 tasks finished except 3 that failed and cannot be salvaged i set those to
    Skipped
    and then restarted the flow --- and then the downstream task was skipped as well and i got :
    Upstream task was skipped; if this was not the intended behavior, consider changing `skip_on_upstream_skip=False` for this task.
    on the downstream task is there any other state i can put them on that would force the downstream task to run for this existing flow run ? (running the entire flow takes 24h so i don’t wanna redo it) or is there another way to force the downstream task to run?
    Anna Geller

    Anna Geller

    2 months ago
    not sure what's the best way, perhaps manually starting those tasks via a separate flow triggered with flow.run()?
    Tom Klein

    Tom Klein

    2 months ago
    @Anna Geller but it’s a downstream task, if i start a new flow or run them manually they won’t have the inputs from upstream..? basically the flow is:1. query snowflake 2. save to CSV 3. split CSV to (580) chunks of size 100 4. map them to a processing task 5. combine all 580 outputs to a single CSV 6. upload
    577 tasks of type (4) finished successfully, and 3 failed • even if i set them to skipped, (5) refuses to run - and instead gets skipped as well • if i set them to Cancelled - they are considered failed and therefore (5) refuses to run • if i set them to Successful - (5) will run, but it will fail due to an error since i’m not verifying my inputs are not
    None
    i looked into the GraphQL docs and there doesn’t seem to be a way to change a task’s trigger on the fly? (if i could just change it to
    any successful
    i guess it would be enough?
    OK i think i found two very extreme workarounds (and i'm not sure the second one would work): 1. to manipulate the checkpoint'ed output of step (3) to make the input to (4) valid, then set the failed tasks to pending and rerun them 2. to fix the Docker image itself which runs the code so that it can better handle bad (basically empty) inputs i'm not sure if the 2nd solution would work cause i'm not sure if it would re-pull the Docker image from ECR or not
    like so:
    import cloudpickle
    
    
    with open('salvage_me', 'rb') as f:
           zoom = cloudpickle.load(f)
    
    
    mylines = '<some valid input>'
    zoom[577] = tuple([zoom[577][0], mylines])
    zoom[578] = tuple([zoom[578][0], mylines])
    zoom[579] = tuple([zoom[579][0], mylines])
    
    
    with open('salvaged', 'wb') as f:
           cloudpickle.dump(zoom, f)
    looks like
    Prefect
    didn’t like it, and my flow is now stuck on this:
    hallelujah - it finished successfully … talk about
    negative engineering
    😞
    Anna Geller

    Anna Geller

    2 months ago
    Congratulations, glad you got it working
    And thanks for sharing your solution, perhaps it helps someone else with the same problem
    Tom Klein

    Tom Klein

    2 months ago
    @Anna Geller thanks - and sorry for the long (😳) writeup but - but- some feedback for prefect, based on this experience of running a heavy flow (in case not all of these are addressed in 2.0 already): • my life would have made much simpler if the GraphQL tool allowed to modify (existing) task's triggers or other attributes (e.g. , if i could change the task's attribute to
    skip_on_upstream_skip=False
    . If the goal is to prevent negative engineering - why not give users more power over their existing flow run - since changing the code would necessarily obligate a new flow run which could be costly (both in time and in money or other resources) • ultimately, these 3 tasks failed due to various "weaknesses" in my own code and/or in Prefect - namely: ◦ my shell script did not create an output when the input was empty (this then led the task to fail when it tried to read the output file and provide it as a result of the task) ◦ their input was empty because i had a small bug in my code, in the upstream task which splits the CSVs ◦ even if i wanted to force the failed tasks to be marked (artificially) as
    Successful
    this would not have been enough because the downstream task assumed all its inputs are valid (e.g. no
    None
    ) ◦ the downstream task did not have
    skip_on_upstream_skip=False
    set on it, so i could not skip the upstream tasks ◦ the trigger for the downstream task was
    all successful
    (which is the default) so it did not tolerate even 3 failures out of 580 • I started to write here various ideas for how Prefect could be more "fool-proof" but honestly i'm not sure i have a clear and universally good idea at the moment - most of the ideas i had basically involved users themselves taking part in some defensive engineering (which you like to call negative engineering ) -- which is exactly the kind of thing Prefect aims to eliminate. • but, there must be some ways for Prefect to be more tolerant to issues like the ones i described, and make it easier to recover from catastrophes without repeating work and wasting resources... 🤔 if users have to write 100% correct Prefect flows always ahead of time (and a single wrong attribute or trigger could make the entire thing fail after tons of time & resources have been spent running the flow) --- then this just pushes the negative engineering problem one level "upwards", from the business logic to the orchestrating-code level --- without eliminating it: instead of making sure my own code can deal with failures, i'm making sure prefect can deal with failures, since a single misconfiguration could spell disaster (in this case, i only managed to salvage it with some "hacking" - i.e. artificially manipulating checkpointed data on
    S3
    )
    Anna Geller

    Anna Geller

    2 months ago
    Thanks for the feedback, I'm sure we can tackle many of the pain points in Prefect 2.0