Lucas Beck
09/21/2021, 8:07 AMn
different jobs in a k8s cluster to perform some computationally heavy tasks. Sometimes when I cancel the flow or it fails for whatever reason, I then need to manually clean up the n
jobs in the k8s cluster. To avoid this manual work, I am trying to create a state handler to deal with that. The handler works for the failed state, but does not with cancelling. I have tried both Cancelling
and Cancelled
. For ilustration purpouses, there is an example below:
import os
import uuid
import time
import prefect
from prefect import Task, Parameter
from prefect.executors.dask import LocalDaskExecutor
from prefect.utilities.edges import unmapped
def clean_up(task, old_state, new_state):
logger = prefect.context.get("logger")
task_full_name = prefect.context.get("task_full_name")
if isinstance(new_state, prefect.engine.state.Failed):
<http://logger.info|logger.info>(
f"cleaning up job {task.run_id}-{task_full_name} due to new state being {new_state}"
)
elif isinstance(new_state, prefect.engine.state.Cancelled):
<http://logger.info|logger.info>(
f"cleaning up job {task.run_id}-{task_full_name} due to new state being {new_state}"
)
elif isinstance(new_state, prefect.engine.state.Cancelling):
<http://logger.info|logger.info>(
f"cleaning up job {task.run_id}-{task_full_name} due to new state being {new_state}"
)
class DoSomeComputation(Task):
def __init__(self, **kwargs):
self.run_id = str(uuid.uuid4())[:8]
super().__init__(**kwargs)
def run(self, input, fail=True):
logger = prefect.context.get("logger")
<http://logger.info|logger.info>(f"Started some computation task with input {input}")
for i in range(0, 200):
time.sleep(1)
if fail:
raise Exception("Something terrible happened")
<http://logger.info|logger.info>("Sucessful completed some computation task")
if __name__ == "__main__":
project_name = "test-flows"
flow_name = "test-cleanup"
with Flow(...) as flow:
fail = Parameter("fail")
do_some_comp = DoSomeComputation(
state_handlers=[clean_up], task_run_name="{task_name}-job-name={input}"
)
inputs = ["input1", "input2", "input3"]
do_some_comp.map(inputs, unmapped(fail))
flow.executor = LocalDaskExecutor(num_workers=20)
flow.register(project_name=project_name)
The "cleaning up job..." log never gets triggered when I cancel the job. Anyone has an ideia or also experienced this?
PS: I am using the KubernetesRun
flow config[2021-09-21 08:19:47+0000] INFO - prefect.Azure | Downloading test-cleanup/2021-09-21t08-19-32-987132-00-00 from prefect-flows
[2021-09-21 08:19:47+0000] INFO - prefect.CloudFlowRunner | Beginning Flow run for 'test-cleanup'
[2021-09-21 08:19:48+0000] INFO - prefect.CloudTaskRunner | Task 'Constant[list]': Starting task run...
[2021-09-21 08:19:48+0000] INFO - prefect.CloudTaskRunner | Task 'fail': Starting task run...
[2021-09-21 08:19:48+0000] INFO - prefect.CloudTaskRunner | Task 'fail': Finished task run for task with final state: 'Success'
[2021-09-21 08:19:48+0000] INFO - prefect.CloudTaskRunner | Task 'Constant[list]': Finished task run for task with final state: 'Success'
[2021-09-21 08:19:48+0000] INFO - prefect.CloudTaskRunner | Task 'DoSomeComputation': Starting task run...
[2021-09-21 08:19:48+0000] INFO - prefect.DoSomeComputation | starting job
[2021-09-21 08:19:48+0000] INFO - prefect.DoSomeComputation | cleaning up job 9c00c730-DoSomeComputation due to new state being <Mapped: "Ready to proceed with mapping.">
[2021-09-21 08:19:48+0000] INFO - prefect.CloudTaskRunner | Task 'DoSomeComputation': Finished task run for task with final state: 'Mapped'
[2021-09-21 08:19:48+0000] INFO - prefect.CloudTaskRunner | Task 'DoSomeComputation[0]': Starting task run...
[2021-09-21 08:19:48+0000] INFO - prefect.CloudTaskRunner | Task 'DoSomeComputation[1]': Starting task run...
[2021-09-21 08:19:48+0000] INFO - prefect.CloudTaskRunner | Task 'DoSomeComputation[2]': Starting task run...
[2021-09-21 08:19:48+0000] INFO - prefect.DoSomeComputation[0] | starting job
[2021-09-21 08:19:48+0000] INFO - prefect.DoSomeComputation[1] | starting job
[2021-09-21 08:19:48+0000] INFO - prefect.DoSomeComputation[2] | starting job
[2021-09-21 08:19:49+0000] INFO - prefect.DoSomeComputation[2] | Started some computation task with input input3
[2021-09-21 08:19:49+0000] INFO - prefect.DoSomeComputation[0] | Started some computation task with input input1
[2021-09-21 08:19:49+0000] INFO - prefect.DoSomeComputation[1] | Started some computation task with input input2
[2021-09-21 08:20:03+0000] INFO - prefect.CloudFlowRunner | Flow run has been cancelled, cancelling active tasks
[2021-09-21 08:20:03+0000] INFO - prefect.LocalDaskExecutor | Attempting to interrupt and cancel all running tasks...
Exception in thread Thread-19:
Traceback (most recent call last):
File "/usr/local/lib/python3.8/threading.py", line 932, in _bootstrap_inner
Exception in thread Thread-20:
Traceback (most recent call last):
File "/usr/local/lib/python3.8/threading.py", line 932, in _bootstrap_inner
self.run()
File "/usr/local/lib/python3.8/threading.py", line 870, in run
self._target(*self._args, **self._kwargs)
File "/usr/local/lib/python3.8/multiprocessing/pool.py", line 114, in worker
Exception in thread Thread-21:
Traceback (most recent call last):
File "/usr/local/lib/python3.8/threading.py", line 932, in _bootstrap_inner
self.run()
File "/usr/local/lib/python3.8/threading.py", line 870, in run
self._target(*self._args, **self._kwargs)
File "/usr/local/lib/python3.8/multiprocessing/pool.py", line 114, in worker
task = get()
KeyboardInterrupt
Exception in thread Thread-4:
Traceback (most recent call last):
File "/usr/local/lib/python3.8/threading.py", line 932, in _bootstrap_inner
Exception in thread Thread-3:
Traceback (most recent call last):
File "/usr/local/lib/python3.8/threading.py", line 932, in _bootstrap_inner
self.run()
File "/usr/local/lib/python3.8/threading.py", line 870, in run
self._target(*self._args, **self._kwargs)
File "/usr/local/lib/python3.8/multiprocessing/pool.py", line 114, in worker
task = get()
KeyboardInterrupt
Exception in thread Thread-2:
Traceback (most recent call last):
File "/usr/local/lib/python3.8/threading.py", line 932, in _bootstrap_inner
self.run()
File "/usr/local/lib/python3.8/threading.py", line 870, in run
task = get()
KeyboardInterrupt
self.run()
File "/usr/local/lib/python3.8/threading.py", line 870, in run
Exception in thread Thread-6:
Traceback (most recent call last):
File "/usr/local/lib/python3.8/threading.py", line 932, in _bootstrap_inner
self._target(*self._args, **self._kwargs)
File "/usr/local/lib/python3.8/multiprocessing/pool.py", line 114, in worker
task = get()
KeyboardInterrupt
Exception in thread Thread-5:
Traceback (most recent call last):
File "/usr/local/lib/python3.8/threading.py", line 932, in _bootstrap_inner
Exception in thread Thread-10:
Traceback (most recent call last):
File "/usr/local/lib/python3.8/threading.py", line 932, in _bootstrap_inner
self.run()
File "/usr/local/lib/python3.8/threading.py", line 870, in run
self._target(*self._args, **self._kwargs)
File "/usr/local/lib/python3.8/multiprocessing/pool.py", line 114, in worker
self.run()
File "/usr/local/lib/python3.8/threading.py", line 870, in run
self._target(*self._args, **self._kwargs)
File "/usr/local/lib/python3.8/multiprocessing/pool.py", line 114, in worker
self.run()
File "/usr/local/lib/python3.8/threading.py", line 870, in run
self.run()
File "/usr/local/lib/python3.8/threading.py", line 870, in run
self._target(*self._args, **self._kwargs)
File "/usr/local/lib/python3.8/multiprocessing/pool.py", line 114, in worker
task = get()
KeyboardInterrupt
self._target(*self._args, **self._kwargs)
File "/usr/local/lib/python3.8/multiprocessing/pool.py", line 114, in worker
task = get()
KeyboardInterrupt
task = get()
KeyboardInterrupt
self._target(*self._args, **self._kwargs)
File "/usr/local/lib/python3.8/multiprocessing/pool.py", line 114, in worker
task = get()
KeyboardInterrupt
task = get()
KeyboardInterrupt
Amanda Wee
09/21/2021, 8:26 AMCancelled
and Cancelling
are flow run states, whereas here by the first parameter name, you're concerned with the state of a task run.new_state.is_finished()
?Lucas Beck
09/21/2021, 9:19 AMdef clean_up(task, old_state, new_state):
logger = prefect.context.get("logger")
<http://logger.info|logger.info>("starting job")
task_full_name = prefect.context.get("task_full_name")
if isinstance(new_state, prefect.engine.state.Failed):
<http://logger.info|logger.info>(
f"cleaning up job {task.run_id}-{task_full_name} due to new state being {new_state}"
)
elif isinstance(new_state, prefect.engine.state.Cancelled):
<http://logger.info|logger.info>(
f"cleaning up job {task.run_id}-{task_full_name} due to new state being {new_state}"
)
elif isinstance(new_state, prefect.engine.state.Cancelling):
<http://logger.info|logger.info>(
f"cleaning up job {task.run_id}-{task_full_name} due to new state being {new_state}"
)
elif isinstance(new_state, prefect.engine.state.Finished):
<http://logger.info|logger.info>(
f"cleaning up job {task.run_id}-{task_full_name} due to new state being {new_state}"
)
elif new_state.is_finished():
<http://logger.info|logger.info>(
f"cleaning up job {task.run_id}-{task_full_name} due to new state being {new_state}"
)
Amanda Wee
09/21/2021, 10:02 AMimport time
import prefect
def task_state_handler(task, old_state, new_state):
logger = prefect.context.get("logger")
<http://logger.info|logger.info>(f"task {task.name} has moved from {old_state} state to {new_state} state")
@prefect.task(state_handlers=[task_state_handler])
def foo():
logger = prefect.context.get("logger")
<http://logger.info|logger.info>("starting task foo")
time.sleep(20)
<http://logger.info|logger.info>("ending task foo")
@prefect.task(state_handlers=[task_state_handler])
def bar():
logger = prefect.context.get("logger")
<http://logger.info|logger.info>("starting task bar")
time.sleep(10)
<http://logger.info|logger.info>("ending task bar")
def flow_state_handler(flow, old_state, new_state):
logger = prefect.context.get("logger")
if new_state.is_finished():
<http://logger.info|logger.info>(f"test flow has finished with {new_state} state")
def main():
with prefect.Flow("test_flow", state_handlers=[flow_state_handler]) as flow:
t1 = foo()
bar(upstream_tasks=[t1])
prefect.Client().create_project(project_name="test-project")
flow.register(project_name="test-project")
if __name__ == "__main__":
main()
I got this log output:
Beginning Flow run for 'test_flow'
Task 'foo': Starting task run...
task foo has moved from <Pending: "Task run created"> state to <Running: "Starting task run."> state
starting task foo
Flow run has been cancelled, cancelling active tasks
test flow has finished with <Cancelled: "Flow run is cancelled"> state
which appears to indicate that my hunch is correct: your task state handler doesn't register anything because the state change is to the flow, not the tasks that are cancelled when the flow is cancelled. Hence, your approach won't work. You could try creating a flow state handler, or perhaps look into `ResourceManager`:
https://docs.prefect.io/core/idioms/resource-manager.htmlLucas Beck
09/21/2021, 11:04 AMYou could try creating a flow state handler, or perhaps look into `ResourceManager`:
https://docs.prefect.io/core/idioms/resource-manager.htmlThat won't work for me as I need to shutdown k8s jobs that are created within the tasks, thus I need to access the tasks attributes
Kevin Kho
Lucas Beck
09/21/2021, 2:02 PM