Joe
06/13/2024, 1:05 AMMarvin
06/13/2024, 1:06 AMpython
from prefect import flow, task, get_run_logger
from dask_kubernetes import make_pod_spec, KubeCluster
from prefect_dask.task_runners import DaskTaskRunner
import time
import signal
import sys
def handle_sigterm(*args):
logger = get_run_logger()
<http://logger.info|logger.info>("SIGTERM received. Cleaning up...")
# Clean up resources here
sys.exit(0)
@task()
def sql_task(task_name: str):
logger = get_run_logger()
<http://logger.info|logger.info>(f"RUNNING: {task_name}")
time.sleep(30)
@flow(name="dask_cancel_issue",
task_runner=DaskTaskRunner(
cluster_class=KubeCluster,
cluster_kwargs={"pod_template": make_pod_spec(
image="some_docker_image_here",
memory_limit="4Gi",
cpu_limit=4,
threads_per_worker=16,
)
},
adapt_kwargs={"minimum": 1, "maximum": 1},
))
def dask_cancel_issue():
signal.signal(signal.SIGTERM, handle_sigterm) # Register SIGTERM handler
logger = get_run_logger()
<http://logger.info|logger.info>(f"Flow now starting")
task_1 = sql_task.with_options(name='task_1').submit(task_name='task_1', wait_for=[])
task_2 = sql_task.with_options(name='task_2').submit(task_name='task_2', wait_for=[task_1])
task_3 = sql_task.with_options(name='task_3').submit(task_name='task_3', wait_for=[task_2])
if __name__ == "__main__":
dask_cancel_issue()
2. Add Custom Exit Signals in Interactive Workflows:
- The Prefect documentation on creating interactive workflows suggests adding custom logic to check for exit signals, which may help in controlled shutdown.
You can use the signal handling mechanism to intercept termination signals and take necessary cleanup actions. This approach involves registering a signal handler and ensuring your resources are cleaned properly when receiving a SIGTERM.
Please refer to the provided code snippet for an illustration of a potential solution. Also, review the referenced GitHub issue for deeper insights into typical challenges and community-provided solutions.
For further improved integration and more robust solutions, I'd recommend revisiting the official Prefect Documentation frequently.
-links: Issue #12976Joe
06/13/2024, 1:09 AMMarvin
06/13/2024, 1:10 AMon_crashed
state event directly unless the termination causes the flow to crash in an unexpected way. Here is some information that helps clarify this:
1. State Hooks Documentation:
- The Prefect documentation on state changes explains how to define hooks that react to state changes, such as `on_crashed`:
python
from prefect import flow, task, get_run_logger
from prefect.blocks.system import String
@task
def my_task_that_crashes():
raise SystemExit("Crashing on purpose!")
async def delete_cloud_run_job(flow_run):
# Example cleanup task
os.system("delete_job_command")
@flow(on_crashed=[delete_cloud_run_job])
def crashing_flow():
flow_run_name = prefect.runtime.flow_run.name
cloud_run_job_name = String(value=flow_run_name)
cloud_run_job_name.save(name="crashing-flow-cloud-run-job", overwrite=True)
my_task_that_crashes()
if __name__ == "__main__":
crashing_flow()
2. State Transition Rules:
- SIGTERM itself is not a state but a signal sent to the process to terminate. If the flow can catch the signal, it can change to a state like CRASHED
, which subsequently triggers the on_crashed
hook.
- See the Prefect documentation on states for details on state transitions. The CRASHED
state is described as a type of terminal state indicating an unexpected failure.
Given these points, if you want to ensure a hook like on_crashed
executes on SIGTERM, it is wise to handle SIGTERM explicitly in the flow and use the state transition as needed. Here’s an example:
python
import signal
import os
import sys
from prefect import flow, task, get_run_logger
def handle_sigterm(signum, frame):
logger = get_run_logger()
<http://logger.info|logger.info>("SIGTERM received. Setting state to CRASHED and cleaning up...")
# Set any necessary state here or perform cleanup
from prefect import state
state = state.Failed("Received SIGTERM") # or use CRASHED
sys.exit(0)
@task
def my_task():
# Simulate a long-running task
time.sleep(60)
@flow(on_crashed=[handle_sigterm])
def my_flow():
signal.signal(signal.SIGTERM, handle_sigterm)
my_task()
if __name__ == "__main__":
my_flow()
### Conclusion
To have the on_crashed
hook called, handle the SIGTERM within your flow, transitioning it to an appropriate state like CRASHED
if needed.
For further details, refer to the Prefect documentation:
- State Change Hooks
- States
By handling signals properly, you can ensure appropriate cleanup and state transitions.
Links:
- Prefect Workflow State Transitions
- State Change Hooks in PrefectJoe
06/13/2024, 5:32 AMMarvin
06/13/2024, 5:33 AMsh
prefect config set PREFECT_EXPERIMENTAL_ENABLE_ENHANCED_CANCELLATION=True
This flag makes the cancellation process more robust and attempts to handle cases where cancellation may fail.
2. Standard Cancellation Process:
Prefect provides some level of control over cancellation at the flow level but not directly at the task level. The documentation (source) mentions that while the cancellation process is robust, it can face limitations if:
- The infrastructure for the flow run has been removed or altered.
- The infrastructure does not support cancellation.
- The worker cannot find the infrastructure associated with the run.
3. Cancellation via CLI:
You can cancel flow runs through the Prefect CLI, and flows will handle cancellation accordingly. However, this will not directly cancel tasks within a flow unless the entire flow is terminated.
Given these points, you might utilize task retries and state transition hooks to handle cancellations more gracefully within tasks and flows. However, explicit task-level cancellation tokens as found in some other systems are not a current feature in Prefect.
Here's an example to demonstrate how you can use state hooks to mimic cancellation:
python
from prefect import flow, task, get_run_logger
from prefect.exceptions import TaskCanceled
import time
@task()
def my_task():
for i in range(100):
time.sleep(1)
# Check if task should be canceled
# Add your custom cancellation logic here if needed
if i > 50: # This is just an example condition
raise TaskCanceled('Task was canceled.')
@flow(result_hooks=[some_cancellation_hook]) # Add your custom hook
def my_flow():
try:
my_task()
except TaskCanceled as e:
logger = get_run_logger()
<http://logger.info|logger.info>("Task was canceled.")
if __name__ == "__main__":
my_flow()
### Additional Resources
For more details, you can refer to the full Prefect Documentation:
- Enhanced Cancellation Documentation
- Flow State Concepts
By setting up hooks and enhancing cancellation features, you can get close to the behavior of cancellation tokens.
Links:
- Prefect Documentation on Flow States
- Enhanced Cancellation Experimental Feature