https://prefect.io logo
Title
a

Alexander van Eck

09/27/2021, 5:14 PM
Hello 👋 I’m trying to understand state_handlers in Task. When a Task is currently running and I cancel the flow are the state_handlers executed? I ask b/c I have a task which calls out to a remote resource and waits, then when the flow is cancelled that task should let the remote resource know we’re no longer waiting.
def cancel(obj: Union[Task, Flow], _: State, new_state: State) -> State:
    if isinstance(new_state, (Cancelling, Cancelled)):
        obj.some_arg.cancel() # let them know we're no longer waiting

    return new_state


class RemoteTask(Task):
    def __init__(self, *args: Any, **kwargs: Any) -> None:
        state_handlers = kwargs.pop('state_handlers', [])
        state_handlers.append(cancel)
        kwargs['state_handlers'] = state_handlers
        super().__init__(*args, **kwargs)
k

Kevin Kho

09/27/2021, 5:23 PM
Hey @Alexander van Eck, we’ve discussed this a bit internally. WE do recognize there is a problem with having graceful shutdowns when flows are cancelled. There is an issue here (I think what you describe similar)
a

Alexander van Eck

09/27/2021, 5:43 PM
This is exactly our usecase! We have very long running tasks and sometimes they lock up the compute for _hours_… 😬 Any timeline on this?
k

Kevin Kho

09/27/2021, 5:49 PM
Unfortunately not in the short term. I really don’t know because it needs to be designed and scopes and the solution may even extend beyond state handlers (think agent handlers). So it came up in the last product roadmap but was deferred a bit because we have more immediate stuff coming up
a

Alexander van Eck

09/27/2021, 9:31 PM
Understandable. Could I perhaps ask for some pointers on how to extend CloudFlowRunner to get this to work for the project I’m working on? I’m thinking the SIGINT that
CloudFlowRunner.check_for_cancellation.interrupt_if_cancelling
throws back to the main thread should be responded to somehow by the main thread and go over all tasks to call their state_handler?
k

Kevin Kho

09/27/2021, 9:32 PM
Yeah wanna put that in the issue?
a

Alexander van Eck

09/27/2021, 9:36 PM
Added
k

Kevin Kho

09/27/2021, 9:36 PM
Thank you 🙂
🙌 1
I saw on Github you asked for a workaround and I’m not sure there is because this might need to be a separate process independent of the flow
a

Alexander van Eck

09/28/2021, 7:22 AM
Yeah - I’ll have to investigate and understand more of the FlowRunner to see where to add to it. 🤔
@Kevin Kho Just read all of the Orion docs. Good stuff! Is the logic surrounding cancellations of flows and how to clean up tasks going to be addressed in 2.0? f.e. will tasks be able to execute a
finally
clause?
@task
def long_running_task():
  response = <http://request.post|request.post>(...) # start long running job
  try:
    while True:
      status = request.get(...).body # check if job is done
      if status == 'ok':
         break
      sleep(10)
  finally:
    request.delete(...) # tell job to stop running
k

Kevin Kho

10/06/2021, 1:30 PM
Uhh…actually now that Orion is release, we’re gonna bump Prefect Core to a 1.0 after a couple of fixes and I think this is one of the fixes that is on the roadmap
But that’s pretty clever, I think it might work
a

Alexander van Eck

10/07/2021, 12:54 PM
That would be great! 🙂 I tried it out but unfortunately dask fails pretty hard. Look forward to an official solution, and good luck on the Orion launch! We’ve already marked Q1/Q2 next year for investigation and integration. 🙌
👍 1
k

Kevin Kho

10/07/2021, 1:55 PM
Thanks for letting me know!