https://prefect.io logo
l

Lucas Beck

09/21/2021, 8:07 AM
Hi everyone, I have a flow that spins up
n
different jobs in a k8s cluster to perform some computationally heavy tasks. Sometimes when I cancel the flow or it fails for whatever reason, I then need to manually clean up the
n
jobs in the k8s cluster. To avoid this manual work, I am trying to create a state handler to deal with that. The handler works for the failed state, but does not with cancelling. I have tried both
Cancelling
and
Cancelled
. For ilustration purpouses, there is an example below:
Copy code
import os
import uuid
import time
import prefect
from prefect import Task, Parameter
from prefect.executors.dask import LocalDaskExecutor
from prefect.utilities.edges import unmapped


def clean_up(task, old_state, new_state):
    logger = prefect.context.get("logger")
    task_full_name = prefect.context.get("task_full_name")
    if isinstance(new_state, prefect.engine.state.Failed):
        <http://logger.info|logger.info>(
            f"cleaning up job {task.run_id}-{task_full_name} due to new state being {new_state}"
        )
    elif isinstance(new_state, prefect.engine.state.Cancelled):
        <http://logger.info|logger.info>(
            f"cleaning up job {task.run_id}-{task_full_name} due to new state being {new_state}"
        )
    elif isinstance(new_state, prefect.engine.state.Cancelling):
        <http://logger.info|logger.info>(
            f"cleaning up job {task.run_id}-{task_full_name} due to new state being {new_state}"
        )


class DoSomeComputation(Task):
    def __init__(self, **kwargs):
        self.run_id = str(uuid.uuid4())[:8]
        super().__init__(**kwargs)

    def run(self, input, fail=True):
        logger = prefect.context.get("logger")
        <http://logger.info|logger.info>(f"Started some computation task with input {input}")
        for i in range(0, 200):
            time.sleep(1)
        if fail:
            raise Exception("Something terrible happened")
        <http://logger.info|logger.info>("Sucessful completed some computation task")


if __name__ == "__main__":
    project_name = "test-flows"
    flow_name = "test-cleanup"
    with Flow(...) as flow:
        fail = Parameter("fail")
        do_some_comp = DoSomeComputation(
            state_handlers=[clean_up], task_run_name="{task_name}-job-name={input}"
        )
        inputs = ["input1", "input2", "input3"]
        do_some_comp.map(inputs, unmapped(fail))

    flow.executor = LocalDaskExecutor(num_workers=20)
    flow.register(project_name=project_name)
The "cleaning up job..." log never gets triggered when I cancel the job. Anyone has an ideia or also experienced this? PS: I am using the
KubernetesRun
flow config
When I run this and look at the main job pod, the logs are as follow:
Copy code
[2021-09-21 08:19:47+0000] INFO - prefect.Azure | Downloading test-cleanup/2021-09-21t08-19-32-987132-00-00 from prefect-flows
[2021-09-21 08:19:47+0000] INFO - prefect.CloudFlowRunner | Beginning Flow run for 'test-cleanup'
[2021-09-21 08:19:48+0000] INFO - prefect.CloudTaskRunner | Task 'Constant[list]': Starting task run...
[2021-09-21 08:19:48+0000] INFO - prefect.CloudTaskRunner | Task 'fail': Starting task run...
[2021-09-21 08:19:48+0000] INFO - prefect.CloudTaskRunner | Task 'fail': Finished task run for task with final state: 'Success'
[2021-09-21 08:19:48+0000] INFO - prefect.CloudTaskRunner | Task 'Constant[list]': Finished task run for task with final state: 'Success'
[2021-09-21 08:19:48+0000] INFO - prefect.CloudTaskRunner | Task 'DoSomeComputation': Starting task run...
[2021-09-21 08:19:48+0000] INFO - prefect.DoSomeComputation | starting job
[2021-09-21 08:19:48+0000] INFO - prefect.DoSomeComputation | cleaning up job 9c00c730-DoSomeComputation due to new state being <Mapped: "Ready to proceed with mapping.">
[2021-09-21 08:19:48+0000] INFO - prefect.CloudTaskRunner | Task 'DoSomeComputation': Finished task run for task with final state: 'Mapped'
[2021-09-21 08:19:48+0000] INFO - prefect.CloudTaskRunner | Task 'DoSomeComputation[0]': Starting task run...
[2021-09-21 08:19:48+0000] INFO - prefect.CloudTaskRunner | Task 'DoSomeComputation[1]': Starting task run...
[2021-09-21 08:19:48+0000] INFO - prefect.CloudTaskRunner | Task 'DoSomeComputation[2]': Starting task run...
[2021-09-21 08:19:48+0000] INFO - prefect.DoSomeComputation[0] | starting job
[2021-09-21 08:19:48+0000] INFO - prefect.DoSomeComputation[1] | starting job
[2021-09-21 08:19:48+0000] INFO - prefect.DoSomeComputation[2] | starting job
[2021-09-21 08:19:49+0000] INFO - prefect.DoSomeComputation[2] | Started some computation task with input input3
[2021-09-21 08:19:49+0000] INFO - prefect.DoSomeComputation[0] | Started some computation task with input input1
[2021-09-21 08:19:49+0000] INFO - prefect.DoSomeComputation[1] | Started some computation task with input input2
[2021-09-21 08:20:03+0000] INFO - prefect.CloudFlowRunner | Flow run has been cancelled, cancelling active tasks
[2021-09-21 08:20:03+0000] INFO - prefect.LocalDaskExecutor | Attempting to interrupt and cancel all running tasks...
Exception in thread Thread-19:
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/threading.py", line 932, in _bootstrap_inner
Exception in thread Thread-20:
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/threading.py", line 932, in _bootstrap_inner
    self.run()
  File "/usr/local/lib/python3.8/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/local/lib/python3.8/multiprocessing/pool.py", line 114, in worker
Exception in thread Thread-21:
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/threading.py", line 932, in _bootstrap_inner
    self.run()
  File "/usr/local/lib/python3.8/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/local/lib/python3.8/multiprocessing/pool.py", line 114, in worker
    task = get()
KeyboardInterrupt
Exception in thread Thread-4:
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/threading.py", line 932, in _bootstrap_inner
Exception in thread Thread-3:
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/threading.py", line 932, in _bootstrap_inner
    self.run()
  File "/usr/local/lib/python3.8/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/local/lib/python3.8/multiprocessing/pool.py", line 114, in worker
    task = get()
KeyboardInterrupt
Exception in thread Thread-2:
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/threading.py", line 932, in _bootstrap_inner
    self.run()
  File "/usr/local/lib/python3.8/threading.py", line 870, in run
    task = get()
KeyboardInterrupt
    self.run()
  File "/usr/local/lib/python3.8/threading.py", line 870, in run
Exception in thread Thread-6:
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/threading.py", line 932, in _bootstrap_inner
    self._target(*self._args, **self._kwargs)
  File "/usr/local/lib/python3.8/multiprocessing/pool.py", line 114, in worker
    task = get()
KeyboardInterrupt
Exception in thread Thread-5:
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/threading.py", line 932, in _bootstrap_inner
Exception in thread Thread-10:
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/threading.py", line 932, in _bootstrap_inner
    self.run()
  File "/usr/local/lib/python3.8/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/local/lib/python3.8/multiprocessing/pool.py", line 114, in worker
    self.run()
  File "/usr/local/lib/python3.8/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/local/lib/python3.8/multiprocessing/pool.py", line 114, in worker
    self.run()
  File "/usr/local/lib/python3.8/threading.py", line 870, in run
    self.run()
  File "/usr/local/lib/python3.8/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/local/lib/python3.8/multiprocessing/pool.py", line 114, in worker
    task = get()
KeyboardInterrupt
    self._target(*self._args, **self._kwargs)
  File "/usr/local/lib/python3.8/multiprocessing/pool.py", line 114, in worker
    task = get()
KeyboardInterrupt
    task = get()
KeyboardInterrupt
    self._target(*self._args, **self._kwargs)
  File "/usr/local/lib/python3.8/multiprocessing/pool.py", line 114, in worker
    task = get()
KeyboardInterrupt
    task = get()
KeyboardInterrupt
a

Amanda Wee

09/21/2021, 8:26 AM
My guess is that the issue is that
Cancelled
and
Cancelling
are flow run states, whereas here by the first parameter name, you're concerned with the state of a task run.
I'm guessing you have to do the same cleanup on success too, so what if you checked
new_state.is_finished()
?
l

Lucas Beck

09/21/2021, 9:19 AM
Hi @Amanda Wee, Thank you for your help. I have tried many different checks now (including finish), none of which seem to work:
Copy code
def clean_up(task, old_state, new_state):
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>("starting job")
    task_full_name = prefect.context.get("task_full_name")
    if isinstance(new_state, prefect.engine.state.Failed):
        <http://logger.info|logger.info>(
            f"cleaning up job {task.run_id}-{task_full_name} due to new state being {new_state}"
        )
    elif isinstance(new_state, prefect.engine.state.Cancelled):
        <http://logger.info|logger.info>(
            f"cleaning up job {task.run_id}-{task_full_name} due to new state being {new_state}"
        )
    elif isinstance(new_state, prefect.engine.state.Cancelling):
        <http://logger.info|logger.info>(
            f"cleaning up job {task.run_id}-{task_full_name} due to new state being {new_state}"
        )
    elif isinstance(new_state, prefect.engine.state.Finished):
        <http://logger.info|logger.info>(
            f"cleaning up job {task.run_id}-{task_full_name} due to new state being {new_state}"
        )
    elif new_state.is_finished():
        <http://logger.info|logger.info>(
            f"cleaning up job {task.run_id}-{task_full_name} due to new state being {new_state}"
        )
a

Amanda Wee

09/21/2021, 10:02 AM
I was curious, so I wrote a test flow:
Copy code
import time

import prefect


def task_state_handler(task, old_state, new_state):
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>(f"task {task.name} has moved from {old_state} state to {new_state} state")


@prefect.task(state_handlers=[task_state_handler])
def foo():
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>("starting task foo")
    time.sleep(20)
    <http://logger.info|logger.info>("ending task foo")


@prefect.task(state_handlers=[task_state_handler])
def bar():
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>("starting task bar")
    time.sleep(10)
    <http://logger.info|logger.info>("ending task bar")



def flow_state_handler(flow, old_state, new_state):
    logger = prefect.context.get("logger")
    if new_state.is_finished():
        <http://logger.info|logger.info>(f"test flow has finished with {new_state} state")


def main():
    with prefect.Flow("test_flow", state_handlers=[flow_state_handler]) as flow:
        t1 = foo()
        bar(upstream_tasks=[t1])
    
    prefect.Client().create_project(project_name="test-project")
    flow.register(project_name="test-project")


if __name__ == "__main__":
    main()
I got this log output:
Copy code
Beginning Flow run for 'test_flow'
Task 'foo': Starting task run...
task foo has moved from <Pending: "Task run created"> state to <Running: "Starting task run."> state
starting task foo
Flow run has been cancelled, cancelling active tasks
test flow has finished with <Cancelled: "Flow run is cancelled"> state
which appears to indicate that my hunch is correct: your task state handler doesn't register anything because the state change is to the flow, not the tasks that are cancelled when the flow is cancelled. Hence, your approach won't work. You could try creating a flow state handler, or perhaps look into `ResourceManager`: https://docs.prefect.io/core/idioms/resource-manager.html
Oh, actually, some bad news. I just noticed this posted in a thread started by Martin: https://github.com/PrefectHQ/prefect/issues/4988 (However this seems to apply to manual cancellation only, so in general you could still go ahead.)
upvote 1
l

Lucas Beck

09/21/2021, 11:04 AM
Oh yea, I am looking for manual cancellation. I guess I will be waiting for that feature. Again, thanks for the help @Amanda Wee
You could try creating a flow state handler, or perhaps look into `ResourceManager`:
https://docs.prefect.io/core/idioms/resource-manager.html
That won't work for me as I need to shutdown k8s jobs that are created within the tasks, thus I need to access the tasks attributes
k

Kevin Kho

09/21/2021, 1:31 PM
Hey, I think that’s you who chimed in the issue? Thanks for that.
l

Lucas Beck

09/21/2021, 2:02 PM
Yeap, indeed 🙂 You are welcome
4 Views