Andreas
11/03/2024, 2:37 AMMarvin
11/03/2024, 2:37 AMon_cancellation
which you can utilize to perform cleanup actions. For example, setting up the on_cancellation
state change hook allows you to execute a clean-up function when a task is cancelled. However, be aware of race conditions and limitations, such as those discussed in this GitHub issue.
4. Experimental Features: You could also enable experimental features like enhanced cancellation to ensure more reliable handling of cancellations. You can set this feature using:
bash
prefect config set PREFECT_EXPERIMENTAL_ENABLE_ENHANCED_CANCELLATION=True
5. Generic Hooks for Cleanup: For concise code, you might want to use a generic hook parameter that handles cleanup for multiple states, such as completion, failure, or cancellation. This approach minimizes code repetition as shown in Generic flow and task hooks GitHub issue.
By setting appropriate state handlers or hooks, you can ensure that cleanup operations are executed when a cancellation occurs, helping ensure that resources are released correctly. You may also need to handle specific infrastructure-related constraints, such as with Dask or Kubernetes integrations.Andreas
11/03/2024, 2:44 AMMarvin
11/03/2024, 2:44 AMon_cancellation
hook for tasks. Instead, you can handle cancellations using Python's exception handling and Prefect's task state.
Here's a straightforward way to implement cleanup within a task function:
1. Use Try-Finally or Try-Except-Finally Blocks: You can use a try-finally
block within your task to ensure that cleanup code runs irrespective of whether the task completes successfully or is cancelled.
2. Check for Cancellations: Prefect raises an Abort
exception when a task is cancelled. You can catch and handle this exception to perform any necessary cleanup.
Here's an example:
python
from prefect import task, flow
import time
@task
def my_task():
try:
# Simulating task activity
print("Task is starting. It will sleep for 10 seconds...")
time.sleep(10)
except Exception as e:
print(f"Task was cancelled or encountered an error: {e}")
finally:
# This cleanup code will always run, even if the task is cancelled
print("Running cleanup operations...")
# Insert your cleanup logic here, such as closing files or releasing resources
@flow
def my_flow():
my_task()
# To test it, you'd trigger this flow and use cancellation features provided by Prefect
In this example, the finally
block ensures that cleanup operations are executed whether the task is cancelled or completes normally. This simple pattern allows you to clean up resources within a task context without relying on flow-level hooks.Andreas
11/03/2024, 3:18 AMMarvin
11/03/2024, 3:18 AMon_cancellation
hook similar to flows. Here are some refined recommendations:
1. Use Exception Handling: Prefect raises an internal exception to handle task cancellations, but directly relying on catching it can be challenging due to how backend task executors handle cancellations. Regular try-except blocks are often not catching these signals as they operate below application-level exceptions.
2. Use Subprocess Handling: One approach involves running the main task workload in a subprocess or in parallel, which allows for catching termination signals at the Python level:
python
from prefect import task, flow
import subprocess
import signal
import time
@task
def my_task():
def handler(signum, frame):
print("Cleanup code executed due to cancellation")
# Insert your cleanup logic here
# Attach the handler to the signal
signal.signal(signal.SIGTERM, handler)
try:
# Simulate task workload
print("Task is running")
time.sleep(10)
except Exception as e:
print(f"Exception during task execution: {e}")
finally:
print("Task cleanup completed.")
@flow
def my_flow():
my_task()
if __name__ == '__main__':
my_flow()
3. Community Suggestions: From the Prefect forums and issues, handling cancellations gracefully often involves infrastructure level automation or having cleanup delegated to other tasks or infrastructure-specific features like AWS Lambda finalizers or Kubernetes preStop Hooks.
4. Infra-Level Graceful Stops: When running tasks on an infrastructure like Kubernetes, using Pre-Stop hooks or termination grace periods defined in the deployment manifest can allow you to execute cleanup scripts even when a Prefect task does not expose direct hooks.
5. Concurrent & Asynchronous Execution: Depending on your runtime environment, using a robust asyncio pattern or threading for task internals can help implement timeouts or manual signals. However, this usually requires more control over the runtime environment.
These strategies utilize Python’s signal handling and subprocess protocols to force a cleanup sequence when cancellation occurs. Keep in mind that the precise implementation and its effectiveness will depend on the task execution environment and scheduler behavior you are using.