Hello, We’re looking at Prefect for some of our m...
# prefect-community
c
Hello, We’re looking at Prefect for some of our more etl-type workflows and an important feature here is that a Prefect task may wish to submit a job to a delegate system- for example EMR or AWS batch. One consideration here is what we should do if a flow is kicked off and we subsequently realise that it needs to be stopped (e.g. because there is some bug in our processing code and the flow is therefore going to yield incorrect results). In this case, the ideal situation would be to have both an easy way of stopping the flow and a mechanism such that any external tasks spawned by the flow (e.g. an AWS batch run) are also terminated. So far, my understanding of Prefect is that you can’t really cancel a running flow. The best you can do is mark a flow run as failed, but I think all this does is stop any pending tasks being scheduled- it doesn’t actually interrupt any running tasks. Moreover it doesn’t allow any cleanup to be done. For example any prefect resource managers in play won’t have their cleanup() methods invoked. This being the case, I think the best that you an do is to manually locate and cancel the external task and then go back and mark the prefect flow as failed. It does, however, look like there’s some active development in this area- specifically: https://github.com/PrefectHQ/prefect/pull/2942. I therefore wondered what the long term goal was here and whether it might support a use case similar to the one I have outlined? In our case, what would be ideal is if the task class itself supported some sort of callback which would alert you to the fact the task was being cancelled.
👀 1
l
Hi @Chris Martin! Welcome to Prefect 🙂 Yes, you are correct that as of today cancellation is only supported insofar as Prefect stops scheduling tasks for that flow, not actively stopping ones that currently exist, and that the latest and greatest in trying to achieve the latter is in the issue you reference #2942 and related changes in our cloud product (which should be released next Monday). Right now how that is implemented is not via an arbitrary callback that might uniquely stop and/or cleanup a delegate resource like you describe, but to interrupt the python execution directly through a SIGINT signal. If your resource propagates SIGINT to something meaningful to you for cleanup, then that is the closest thing I think you can hook into with what is in flight for cancellation right now. That being said if that doesn’t meet your needs, opening an issue with what you are looking for can definitely get it on our radar for the next iteration!
By the way, I just got confirmation from the folks working on cancellation that per-task cancellation callbacks has been discussed internally (just isn’t in this iteration yet) — so — if nothing else your message is a good +1 for that feature!
c
Hi @Laura Lorenz (she/her) thanks for your quick reply. Am I right in thinking that after #292 is released I should be able to define a task something like:
Copy code
@task
def task_with_cleanup(x):
    try:
        // do work here
    except KeyboardInterrupt:
        // flow is being cancelled do cleanup here
        sys.exit()
l
I believe so -- I haven’t tested it personally but it should be since signint turns into KeyboardInterrupt in python and we aren’t doing anything otherwise fancy afaik with the exception in the task! I do think it would be best if you re-raise the KeyboardInterrupt at the end of your custom cleanup code, looking at the PR for #2942, because we catch it later to set the flow state properly
c
ok great- in which case I'm definitely excited about the next release!
🦜 1
Hi @Laura Lorenz - I saw the PR was merged so I was just trying to test this. Unfortunately I can't seem to get the KeyboadInterrupt to trigger. Do you know if this should work now and, if so, how you can get the task to interrupt froom the prefect cloud UI?
l
Hi @Chris Martin ! Even though the core PR was merged I think there is still something that needs to be exposed in the UI to trigger its behavior as opposed to the “cancellation lite” style from before (which doesn’t try to stop tasks in flight), but my info might be a bit out of date. Friendly ping to @Jim Crist-Harif in case he can chime in or grab someone who knows the latest on this at some point :)
j
Hi @Chris Martin, cancellation was reworked in 0.13.0 - as of now all the pieces except the UI have been updated to support true full cancellation running flow runs. Note that the presence/absence of
KeyboardInterrupt
on cancellation is an implementation detail, and isn't something I would recommend relying on. If you need a resource to be cleaned up when a python process exits, I recommend looking into the
atexit
or
weakref.finalize
handlers in the Python standard library. We hope to update the UI in the next week to add a button to cancel a flow run, but in the meantime you can manually call the
cancel_flow_run
graphql route to cancel an active flow run.
c
Hi- thanks @Jim Crist-Harif . Is there any way I can get notified when that change is released (it'll save me scouring the ui for a cancellation button for the next week!) As for the way to detect cancellations- I'll be honest, I'm less fond of using the two mechanisms you propose than I am from using
keyboardInterrupt
mainly because I really want to detect that a cancellation has been requested and both
atexit
or
weakref.finalize
are somewhat abstracted away from that. Do you think there's any chance of adding an
on_cancel
method to
Task
. That way it would be much more explicit about what was going on.
j
We're still thinking about the best way to do that, right now we're leaning towards making it so
cleanup
tasks in a resource manager always run if the
setup
task runs, even if cancellation occurs. Definitely on our roadmap.
Is there a reason you need to know that cancellation occurred specifically? What code do you want to run on cancellation that you wouldn't want to run in other cases?
Is there any way I can get notified when that change is released (it'll save me scouring the ui for a cancellation button for the next week!)
Definitely, I'll make a note to ping you when that's released.
c
Definitely, I'll make a note to ping you when that's released.
Awesome, much appreciated
Is there a reason you need to know that cancellation occurred specifically? What code do you want to run on cancellation that you wouldn't want to run in other cases?
You may have a fair point here. I guess the common case here is controlling something like AWS batch with Prefect. You make an API call to submit a batch run and then make subsequent API calls to poll the status of the run so as to check for some terminal state. All of the terminal states guarantee that the resources acquired by the batch run have been released, so if the task runs to terminal then there's no cleanup to do. Incidentally, I don't think your current resource manager works great here because you don't know the resource that needs tearing down until the task's run() method has been called, although possibly I'm missing a pattern here! Now, what this means if the run() method has executed then there's no cleanup to do. If however, the run() method is interrupted (either by a cancellation or by some unhandelled exception) then we really want to stop the AWS batch job. I guess my argument in this case is that I'd much rather have an explicit callback than hook into a systemwide hook for a system I don't control. Moreover I think there is some value between being able to differentiate between a task cancellation (which I think we can assume means that the system is generally in a good state) and an an arbitrary termination (in which case you can't really know what state the system will be in).
j
Definitely, I'll make a note to ping you when that's released.
Hi @Chris Martin, the cancellation button is now available in the cloud UI (I think this went live late last week). Apologies for the late notice here.