Thread
#prefect-community
    Lucas Beck

    Lucas Beck

    1 year ago
    Hi everyone, I have a flow that spins up
    n
    different jobs in a k8s cluster to perform some computationally heavy tasks. Sometimes when I cancel the flow or it fails for whatever reason, I then need to manually clean up the
    n
    jobs in the k8s cluster. To avoid this manual work, I am trying to create a state handler to deal with that. The handler works for the failed state, but does not with cancelling. I have tried both
    Cancelling
    and
    Cancelled
    . For ilustration purpouses, there is an example below:
    import os
    import uuid
    import time
    import prefect
    from prefect import Task, Parameter
    from prefect.executors.dask import LocalDaskExecutor
    from prefect.utilities.edges import unmapped
    
    
    def clean_up(task, old_state, new_state):
        logger = prefect.context.get("logger")
        task_full_name = prefect.context.get("task_full_name")
        if isinstance(new_state, prefect.engine.state.Failed):
            <http://logger.info|logger.info>(
                f"cleaning up job {task.run_id}-{task_full_name} due to new state being {new_state}"
            )
        elif isinstance(new_state, prefect.engine.state.Cancelled):
            <http://logger.info|logger.info>(
                f"cleaning up job {task.run_id}-{task_full_name} due to new state being {new_state}"
            )
        elif isinstance(new_state, prefect.engine.state.Cancelling):
            <http://logger.info|logger.info>(
                f"cleaning up job {task.run_id}-{task_full_name} due to new state being {new_state}"
            )
    
    
    class DoSomeComputation(Task):
        def __init__(self, **kwargs):
            self.run_id = str(uuid.uuid4())[:8]
            super().__init__(**kwargs)
    
        def run(self, input, fail=True):
            logger = prefect.context.get("logger")
            <http://logger.info|logger.info>(f"Started some computation task with input {input}")
            for i in range(0, 200):
                time.sleep(1)
            if fail:
                raise Exception("Something terrible happened")
            <http://logger.info|logger.info>("Sucessful completed some computation task")
    
    
    if __name__ == "__main__":
        project_name = "test-flows"
        flow_name = "test-cleanup"
        with Flow(...) as flow:
            fail = Parameter("fail")
            do_some_comp = DoSomeComputation(
                state_handlers=[clean_up], task_run_name="{task_name}-job-name={input}"
            )
            inputs = ["input1", "input2", "input3"]
            do_some_comp.map(inputs, unmapped(fail))
    
        flow.executor = LocalDaskExecutor(num_workers=20)
        flow.register(project_name=project_name)
    The "cleaning up job..." log never gets triggered when I cancel the job. Anyone has an ideia or also experienced this? PS: I am using the
    KubernetesRun
    flow config
    When I run this and look at the main job pod, the logs are as follow:
    [2021-09-21 08:19:47+0000] INFO - prefect.Azure | Downloading test-cleanup/2021-09-21t08-19-32-987132-00-00 from prefect-flows
    [2021-09-21 08:19:47+0000] INFO - prefect.CloudFlowRunner | Beginning Flow run for 'test-cleanup'
    [2021-09-21 08:19:48+0000] INFO - prefect.CloudTaskRunner | Task 'Constant[list]': Starting task run...
    [2021-09-21 08:19:48+0000] INFO - prefect.CloudTaskRunner | Task 'fail': Starting task run...
    [2021-09-21 08:19:48+0000] INFO - prefect.CloudTaskRunner | Task 'fail': Finished task run for task with final state: 'Success'
    [2021-09-21 08:19:48+0000] INFO - prefect.CloudTaskRunner | Task 'Constant[list]': Finished task run for task with final state: 'Success'
    [2021-09-21 08:19:48+0000] INFO - prefect.CloudTaskRunner | Task 'DoSomeComputation': Starting task run...
    [2021-09-21 08:19:48+0000] INFO - prefect.DoSomeComputation | starting job
    [2021-09-21 08:19:48+0000] INFO - prefect.DoSomeComputation | cleaning up job 9c00c730-DoSomeComputation due to new state being <Mapped: "Ready to proceed with mapping.">
    [2021-09-21 08:19:48+0000] INFO - prefect.CloudTaskRunner | Task 'DoSomeComputation': Finished task run for task with final state: 'Mapped'
    [2021-09-21 08:19:48+0000] INFO - prefect.CloudTaskRunner | Task 'DoSomeComputation[0]': Starting task run...
    [2021-09-21 08:19:48+0000] INFO - prefect.CloudTaskRunner | Task 'DoSomeComputation[1]': Starting task run...
    [2021-09-21 08:19:48+0000] INFO - prefect.CloudTaskRunner | Task 'DoSomeComputation[2]': Starting task run...
    [2021-09-21 08:19:48+0000] INFO - prefect.DoSomeComputation[0] | starting job
    [2021-09-21 08:19:48+0000] INFO - prefect.DoSomeComputation[1] | starting job
    [2021-09-21 08:19:48+0000] INFO - prefect.DoSomeComputation[2] | starting job
    [2021-09-21 08:19:49+0000] INFO - prefect.DoSomeComputation[2] | Started some computation task with input input3
    [2021-09-21 08:19:49+0000] INFO - prefect.DoSomeComputation[0] | Started some computation task with input input1
    [2021-09-21 08:19:49+0000] INFO - prefect.DoSomeComputation[1] | Started some computation task with input input2
    [2021-09-21 08:20:03+0000] INFO - prefect.CloudFlowRunner | Flow run has been cancelled, cancelling active tasks
    [2021-09-21 08:20:03+0000] INFO - prefect.LocalDaskExecutor | Attempting to interrupt and cancel all running tasks...
    Exception in thread Thread-19:
    Traceback (most recent call last):
      File "/usr/local/lib/python3.8/threading.py", line 932, in _bootstrap_inner
    Exception in thread Thread-20:
    Traceback (most recent call last):
      File "/usr/local/lib/python3.8/threading.py", line 932, in _bootstrap_inner
        self.run()
      File "/usr/local/lib/python3.8/threading.py", line 870, in run
        self._target(*self._args, **self._kwargs)
      File "/usr/local/lib/python3.8/multiprocessing/pool.py", line 114, in worker
    Exception in thread Thread-21:
    Traceback (most recent call last):
      File "/usr/local/lib/python3.8/threading.py", line 932, in _bootstrap_inner
        self.run()
      File "/usr/local/lib/python3.8/threading.py", line 870, in run
        self._target(*self._args, **self._kwargs)
      File "/usr/local/lib/python3.8/multiprocessing/pool.py", line 114, in worker
        task = get()
    KeyboardInterrupt
    Exception in thread Thread-4:
    Traceback (most recent call last):
      File "/usr/local/lib/python3.8/threading.py", line 932, in _bootstrap_inner
    Exception in thread Thread-3:
    Traceback (most recent call last):
      File "/usr/local/lib/python3.8/threading.py", line 932, in _bootstrap_inner
        self.run()
      File "/usr/local/lib/python3.8/threading.py", line 870, in run
        self._target(*self._args, **self._kwargs)
      File "/usr/local/lib/python3.8/multiprocessing/pool.py", line 114, in worker
        task = get()
    KeyboardInterrupt
    Exception in thread Thread-2:
    Traceback (most recent call last):
      File "/usr/local/lib/python3.8/threading.py", line 932, in _bootstrap_inner
        self.run()
      File "/usr/local/lib/python3.8/threading.py", line 870, in run
        task = get()
    KeyboardInterrupt
        self.run()
      File "/usr/local/lib/python3.8/threading.py", line 870, in run
    Exception in thread Thread-6:
    Traceback (most recent call last):
      File "/usr/local/lib/python3.8/threading.py", line 932, in _bootstrap_inner
        self._target(*self._args, **self._kwargs)
      File "/usr/local/lib/python3.8/multiprocessing/pool.py", line 114, in worker
        task = get()
    KeyboardInterrupt
    Exception in thread Thread-5:
    Traceback (most recent call last):
      File "/usr/local/lib/python3.8/threading.py", line 932, in _bootstrap_inner
    Exception in thread Thread-10:
    Traceback (most recent call last):
      File "/usr/local/lib/python3.8/threading.py", line 932, in _bootstrap_inner
        self.run()
      File "/usr/local/lib/python3.8/threading.py", line 870, in run
        self._target(*self._args, **self._kwargs)
      File "/usr/local/lib/python3.8/multiprocessing/pool.py", line 114, in worker
        self.run()
      File "/usr/local/lib/python3.8/threading.py", line 870, in run
        self._target(*self._args, **self._kwargs)
      File "/usr/local/lib/python3.8/multiprocessing/pool.py", line 114, in worker
        self.run()
      File "/usr/local/lib/python3.8/threading.py", line 870, in run
        self.run()
      File "/usr/local/lib/python3.8/threading.py", line 870, in run
        self._target(*self._args, **self._kwargs)
      File "/usr/local/lib/python3.8/multiprocessing/pool.py", line 114, in worker
        task = get()
    KeyboardInterrupt
        self._target(*self._args, **self._kwargs)
      File "/usr/local/lib/python3.8/multiprocessing/pool.py", line 114, in worker
        task = get()
    KeyboardInterrupt
        task = get()
    KeyboardInterrupt
        self._target(*self._args, **self._kwargs)
      File "/usr/local/lib/python3.8/multiprocessing/pool.py", line 114, in worker
        task = get()
    KeyboardInterrupt
        task = get()
    KeyboardInterrupt
    Amanda Wee

    Amanda Wee

    1 year ago
    My guess is that the issue is that
    Cancelled
    and
    Cancelling
    are flow run states, whereas here by the first parameter name, you're concerned with the state of a task run.
    I'm guessing you have to do the same cleanup on success too, so what if you checked
    new_state.is_finished()
    ?
    Lucas Beck

    Lucas Beck

    1 year ago
    Hi @Amanda Wee, Thank you for your help. I have tried many different checks now (including finish), none of which seem to work:
    def clean_up(task, old_state, new_state):
        logger = prefect.context.get("logger")
        <http://logger.info|logger.info>("starting job")
        task_full_name = prefect.context.get("task_full_name")
        if isinstance(new_state, prefect.engine.state.Failed):
            <http://logger.info|logger.info>(
                f"cleaning up job {task.run_id}-{task_full_name} due to new state being {new_state}"
            )
        elif isinstance(new_state, prefect.engine.state.Cancelled):
            <http://logger.info|logger.info>(
                f"cleaning up job {task.run_id}-{task_full_name} due to new state being {new_state}"
            )
        elif isinstance(new_state, prefect.engine.state.Cancelling):
            <http://logger.info|logger.info>(
                f"cleaning up job {task.run_id}-{task_full_name} due to new state being {new_state}"
            )
        elif isinstance(new_state, prefect.engine.state.Finished):
            <http://logger.info|logger.info>(
                f"cleaning up job {task.run_id}-{task_full_name} due to new state being {new_state}"
            )
        elif new_state.is_finished():
            <http://logger.info|logger.info>(
                f"cleaning up job {task.run_id}-{task_full_name} due to new state being {new_state}"
            )
    Amanda Wee

    Amanda Wee

    1 year ago
    I was curious, so I wrote a test flow:
    import time
    
    import prefect
    
    
    def task_state_handler(task, old_state, new_state):
        logger = prefect.context.get("logger")
        <http://logger.info|logger.info>(f"task {task.name} has moved from {old_state} state to {new_state} state")
    
    
    @prefect.task(state_handlers=[task_state_handler])
    def foo():
        logger = prefect.context.get("logger")
        <http://logger.info|logger.info>("starting task foo")
        time.sleep(20)
        <http://logger.info|logger.info>("ending task foo")
    
    
    @prefect.task(state_handlers=[task_state_handler])
    def bar():
        logger = prefect.context.get("logger")
        <http://logger.info|logger.info>("starting task bar")
        time.sleep(10)
        <http://logger.info|logger.info>("ending task bar")
    
    
    
    def flow_state_handler(flow, old_state, new_state):
        logger = prefect.context.get("logger")
        if new_state.is_finished():
            <http://logger.info|logger.info>(f"test flow has finished with {new_state} state")
    
    
    def main():
        with prefect.Flow("test_flow", state_handlers=[flow_state_handler]) as flow:
            t1 = foo()
            bar(upstream_tasks=[t1])
        
        prefect.Client().create_project(project_name="test-project")
        flow.register(project_name="test-project")
    
    
    if __name__ == "__main__":
        main()
    I got this log output:
    Beginning Flow run for 'test_flow'
    Task 'foo': Starting task run...
    task foo has moved from <Pending: "Task run created"> state to <Running: "Starting task run."> state
    starting task foo
    Flow run has been cancelled, cancelling active tasks
    test flow has finished with <Cancelled: "Flow run is cancelled"> state
    which appears to indicate that my hunch is correct: your task state handler doesn't register anything because the state change is to the flow, not the tasks that are cancelled when the flow is cancelled. Hence, your approach won't work. You could try creating a flow state handler, or perhaps look into ResourceManager:https://docs.prefect.io/core/idioms/resource-manager.html
    Oh, actually, some bad news. I just noticed this posted in a thread started by Martin:https://github.com/PrefectHQ/prefect/issues/4988 (However this seems to apply to manual cancellation only, so in general you could still go ahead.)
    Lucas Beck

    Lucas Beck

    1 year ago
    Oh yea, I am looking for manual cancellation. I guess I will be waiting for that feature. Again, thanks for the help @Amanda Wee
    You could try creating a flow state handler, or perhaps look into ResourceManager:
    https://docs.prefect.io/core/idioms/resource-manager.html
    That won't work for me as I need to shutdown k8s jobs that are created within the tasks, thus I need to access the tasks attributes
    Kevin Kho

    Kevin Kho

    1 year ago
    Hey, I think that’s you who chimed in the issue? Thanks for that.
    Lucas Beck

    Lucas Beck

    1 year ago
    Yeap, indeed 🙂 You are welcome