Hello :wave: I’m trying to understand state_handle...
# prefect-server
a
Hello 👋 I’m trying to understand state_handlers in Task. When a Task is currently running and I cancel the flow are the state_handlers executed? I ask b/c I have a task which calls out to a remote resource and waits, then when the flow is cancelled that task should let the remote resource know we’re no longer waiting.
Copy code
def cancel(obj: Union[Task, Flow], _: State, new_state: State) -> State:
    if isinstance(new_state, (Cancelling, Cancelled)):
        obj.some_arg.cancel() # let them know we're no longer waiting

    return new_state


class RemoteTask(Task):
    def __init__(self, *args: Any, **kwargs: Any) -> None:
        state_handlers = kwargs.pop('state_handlers', [])
        state_handlers.append(cancel)
        kwargs['state_handlers'] = state_handlers
        super().__init__(*args, **kwargs)
k
Hey @Alexander van Eck, we’ve discussed this a bit internally. WE do recognize there is a problem with having graceful shutdowns when flows are cancelled. There is an issue here (I think what you describe similar)
a
This is exactly our usecase! We have very long running tasks and sometimes they lock up the compute for _hours_… 😬 Any timeline on this?
k
Unfortunately not in the short term. I really don’t know because it needs to be designed and scopes and the solution may even extend beyond state handlers (think agent handlers). So it came up in the last product roadmap but was deferred a bit because we have more immediate stuff coming up
a
Understandable. Could I perhaps ask for some pointers on how to extend CloudFlowRunner to get this to work for the project I’m working on? I’m thinking the SIGINT that
CloudFlowRunner.check_for_cancellation.interrupt_if_cancelling
throws back to the main thread should be responded to somehow by the main thread and go over all tasks to call their state_handler?
k
Yeah wanna put that in the issue?
a
Added
k
Thank you 🙂
🙌 1
I saw on Github you asked for a workaround and I’m not sure there is because this might need to be a separate process independent of the flow
a
Yeah - I’ll have to investigate and understand more of the FlowRunner to see where to add to it. 🤔
@Kevin Kho Just read all of the Orion docs. Good stuff! Is the logic surrounding cancellations of flows and how to clean up tasks going to be addressed in 2.0? f.e. will tasks be able to execute a
finally
clause?
Copy code
@task
def long_running_task():
  response = <http://request.post|request.post>(...) # start long running job
  try:
    while True:
      status = request.get(...).body # check if job is done
      if status == 'ok':
         break
      sleep(10)
  finally:
    request.delete(...) # tell job to stop running
k
Uhh…actually now that Orion is release, we’re gonna bump Prefect Core to a 1.0 after a couple of fixes and I think this is one of the fixes that is on the roadmap
But that’s pretty clever, I think it might work
a
That would be great! 🙂 I tried it out but unfortunately dask fails pretty hard. Look forward to an official solution, and good luck on the Orion launch! We’ve already marked Q1/Q2 next year for investigation and integration. 🙌
👍 1
k
Thanks for letting me know!