<@ULVA73B9P> how to handle sigterm to cleanup reso...
# marvin-in-the-wild
j
@Marvin how to handle sigterm to cleanup resources inside a prefect flow
m
Cleaning up resources in a Prefect flow when a SIGTERM is received can indeed be challenging. Unfortunately, Prefect does not have direct hooks for handling SIGTERM signals within flows. The process involves some creative workarounds. Here are the findings and potential solutions based on the resources and issues I checked: 1. Graceful Shutdown Handling for Dask: - An issue was reported where Dask-related pods were left orphaned after cancelling a flow (#12976 on GitHub). This indicates a missing mechanism to propagate termination signals to resources created by the flow.
Copy code
python
   from prefect import flow, task, get_run_logger
   from dask_kubernetes import make_pod_spec, KubeCluster
   from prefect_dask.task_runners import DaskTaskRunner
   import time
   import signal
   import sys

   def handle_sigterm(*args):
       logger = get_run_logger()
       <http://logger.info|logger.info>("SIGTERM received. Cleaning up...")
       # Clean up resources here
       sys.exit(0)

   @task()
   def sql_task(task_name: str):
       logger = get_run_logger()
       <http://logger.info|logger.info>(f"RUNNING: {task_name}")
       time.sleep(30)

   @flow(name="dask_cancel_issue",
         task_runner=DaskTaskRunner(
             cluster_class=KubeCluster,
             cluster_kwargs={"pod_template": make_pod_spec(
                                                 image="some_docker_image_here",
                                                 memory_limit="4Gi",
                                                 cpu_limit=4,
                                                 threads_per_worker=16,
                                             )
                             },
             adapt_kwargs={"minimum": 1, "maximum": 1},
         ))
   def dask_cancel_issue():
       signal.signal(signal.SIGTERM, handle_sigterm)  # Register SIGTERM handler
       logger = get_run_logger()
       <http://logger.info|logger.info>(f"Flow now starting")
       task_1 = sql_task.with_options(name='task_1').submit(task_name='task_1', wait_for=[])
       task_2 = sql_task.with_options(name='task_2').submit(task_name='task_2', wait_for=[task_1])
       task_3 = sql_task.with_options(name='task_3').submit(task_name='task_3', wait_for=[task_2])

   if __name__ == "__main__":
       dask_cancel_issue()
2. Add Custom Exit Signals in Interactive Workflows: - The Prefect documentation on creating interactive workflows suggests adding custom logic to check for exit signals, which may help in controlled shutdown. You can use the signal handling mechanism to intercept termination signals and take necessary cleanup actions. This approach involves registering a signal handler and ensuring your resources are cleaned properly when receiving a SIGTERM. Please refer to the provided code snippet for an illustration of a potential solution. Also, review the referenced GitHub issue for deeper insights into typical challenges and community-provided solutions. For further improved integration and more robust solutions, I'd recommend revisiting the official Prefect Documentation frequently. -links: Issue #12976
j
@Marvin Will the on_crashed state event be called on sigterm?
m
When a SIGTERM is received, it will not necessarily trigger the
on_crashed
state event directly unless the termination causes the flow to crash in an unexpected way. Here is some information that helps clarify this: 1. State Hooks Documentation: - The Prefect documentation on state changes explains how to define hooks that react to state changes, such as `on_crashed`:
Copy code
python
     from prefect import flow, task, get_run_logger
     from prefect.blocks.system import String

     @task
     def my_task_that_crashes():
         raise SystemExit("Crashing on purpose!")

     async def delete_cloud_run_job(flow_run):
         # Example cleanup task
         os.system("delete_job_command")

     @flow(on_crashed=[delete_cloud_run_job])
     def crashing_flow():
         flow_run_name = prefect.runtime.flow_run.name
         cloud_run_job_name = String(value=flow_run_name)
         cloud_run_job_name.save(name="crashing-flow-cloud-run-job", overwrite=True)
         my_task_that_crashes()

     if __name__ == "__main__":
         crashing_flow()
2. State Transition Rules: - SIGTERM itself is not a state but a signal sent to the process to terminate. If the flow can catch the signal, it can change to a state like
CRASHED
, which subsequently triggers the
on_crashed
hook. - See the Prefect documentation on states for details on state transitions. The
CRASHED
state is described as a type of terminal state indicating an unexpected failure. Given these points, if you want to ensure a hook like
on_crashed
executes on SIGTERM, it is wise to handle SIGTERM explicitly in the flow and use the state transition as needed. Here’s an example:
Copy code
python
import signal
import os
import sys
from prefect import flow, task, get_run_logger

def handle_sigterm(signum, frame):
    logger = get_run_logger()
    <http://logger.info|logger.info>("SIGTERM received. Setting state to CRASHED and cleaning up...")
    # Set any necessary state here or perform cleanup
    from prefect import state 
    state = state.Failed("Received SIGTERM")  # or use CRASHED
    sys.exit(0)

@task
def my_task():
    # Simulate a long-running task
    time.sleep(60)

@flow(on_crashed=[handle_sigterm])
def my_flow():
    signal.signal(signal.SIGTERM, handle_sigterm)
    my_task()

if __name__ == "__main__":
    my_flow()
### Conclusion To have the
on_crashed
hook called, handle the SIGTERM within your flow, transitioning it to an appropriate state like
CRASHED
if needed. For further details, refer to the Prefect documentation: - State Change Hooks - States By handling signals properly, you can ensure appropriate cleanup and state transitions. Links: - Prefect Workflow State Transitions - State Change Hooks in Prefect
j
@Marvin does prefect have some idea of a task cancellation token?
m
Prefect does not have a dedicated "task cancellation token" similar to what you might find in some other orchestration platforms or libraries. However, Prefect provides mechanisms to handle flow and task cancellations indirectly. Here are the key takeaways based on the search results: 1. Enhanced Cancellation: Prefect provides an enhanced cancellation feature that can be enabled to improve the cancellation experience. This can be enabled with an experimental feature flag which you can set in your configuration:
Copy code
sh
   prefect config set PREFECT_EXPERIMENTAL_ENABLE_ENHANCED_CANCELLATION=True
This flag makes the cancellation process more robust and attempts to handle cases where cancellation may fail. 2. Standard Cancellation Process: Prefect provides some level of control over cancellation at the flow level but not directly at the task level. The documentation (source) mentions that while the cancellation process is robust, it can face limitations if: - The infrastructure for the flow run has been removed or altered. - The infrastructure does not support cancellation. - The worker cannot find the infrastructure associated with the run. 3. Cancellation via CLI: You can cancel flow runs through the Prefect CLI, and flows will handle cancellation accordingly. However, this will not directly cancel tasks within a flow unless the entire flow is terminated. Given these points, you might utilize task retries and state transition hooks to handle cancellations more gracefully within tasks and flows. However, explicit task-level cancellation tokens as found in some other systems are not a current feature in Prefect. Here's an example to demonstrate how you can use state hooks to mimic cancellation:
Copy code
python
from prefect import flow, task, get_run_logger
from prefect.exceptions import TaskCanceled
import time

@task()
def my_task():
    for i in range(100):
        time.sleep(1)
        # Check if task should be canceled
        # Add your custom cancellation logic here if needed
        if i > 50:  # This is just an example condition
            raise TaskCanceled('Task was canceled.')

@flow(result_hooks=[some_cancellation_hook])  # Add your custom hook
def my_flow():
    try:
        my_task()
    except TaskCanceled as e:
        logger = get_run_logger()
        <http://logger.info|logger.info>("Task was canceled.")

if __name__ == "__main__":
    my_flow()
### Additional Resources For more details, you can refer to the full Prefect Documentation: - Enhanced Cancellation Documentation - Flow State Concepts By setting up hooks and enhancing cancellation features, you can get close to the behavior of cancellation tokens. Links: - Prefect Documentation on Flow States - Enhanced Cancellation Experimental Feature