Thread
#prefect-server
    a

    Alexander van Eck

    11 months ago
    Hello 👋 I’m trying to understand state_handlers in Task. When a Task is currently running and I cancel the flow are the state_handlers executed? I ask b/c I have a task which calls out to a remote resource and waits, then when the flow is cancelled that task should let the remote resource know we’re no longer waiting.
    def cancel(obj: Union[Task, Flow], _: State, new_state: State) -> State:
        if isinstance(new_state, (Cancelling, Cancelled)):
            obj.some_arg.cancel() # let them know we're no longer waiting
    
        return new_state
    
    
    class RemoteTask(Task):
        def __init__(self, *args: Any, **kwargs: Any) -> None:
            state_handlers = kwargs.pop('state_handlers', [])
            state_handlers.append(cancel)
            kwargs['state_handlers'] = state_handlers
            super().__init__(*args, **kwargs)
    Kevin Kho

    Kevin Kho

    11 months ago
    Hey @Alexander van Eck, we’ve discussed this a bit internally. WE do recognize there is a problem with having graceful shutdowns when flows are cancelled. There is an issue here (I think what you describe similar)
    a

    Alexander van Eck

    11 months ago
    This is exactly our usecase! We have very long running tasks and sometimes they lock up the compute for _hours_… 😬 Any timeline on this?
    Kevin Kho

    Kevin Kho

    11 months ago
    Unfortunately not in the short term. I really don’t know because it needs to be designed and scopes and the solution may even extend beyond state handlers (think agent handlers). So it came up in the last product roadmap but was deferred a bit because we have more immediate stuff coming up
    a

    Alexander van Eck

    11 months ago
    Understandable. Could I perhaps ask for some pointers on how to extend CloudFlowRunner to get this to work for the project I’m working on? I’m thinking the SIGINT that
    CloudFlowRunner.check_for_cancellation.interrupt_if_cancelling
    throws back to the main thread should be responded to somehow by the main thread and go over all tasks to call their state_handler?
    Kevin Kho

    Kevin Kho

    11 months ago
    Yeah wanna put that in the issue?
    a

    Alexander van Eck

    11 months ago
    Added
    Kevin Kho

    Kevin Kho

    11 months ago
    Thank you 🙂
    I saw on Github you asked for a workaround and I’m not sure there is because this might need to be a separate process independent of the flow
    a

    Alexander van Eck

    11 months ago
    Yeah - I’ll have to investigate and understand more of the FlowRunner to see where to add to it. 🤔
    @Kevin Kho Just read all of the Orion docs. Good stuff! Is the logic surrounding cancellations of flows and how to clean up tasks going to be addressed in 2.0? f.e. will tasks be able to execute a
    finally
    clause?
    @task
    def long_running_task():
      response = <http://request.post|request.post>(...) # start long running job
      try:
        while True:
          status = request.get(...).body # check if job is done
          if status == 'ok':
             break
          sleep(10)
      finally:
        request.delete(...) # tell job to stop running
    Kevin Kho

    Kevin Kho

    11 months ago
    Uhh…actually now that Orion is release, we’re gonna bump Prefect Core to a 1.0 after a couple of fixes and I think this is one of the fixes that is on the roadmap
    But that’s pretty clever, I think it might work
    a

    Alexander van Eck

    11 months ago
    That would be great! 🙂 I tried it out but unfortunately dask fails pretty hard. Look forward to an official solution, and good luck on the Orion launch! We’ve already marked Q1/Q2 next year for investigation and integration. 🙌
    Kevin Kho

    Kevin Kho

    11 months ago
    Thanks for letting me know!