Hi <@ULVA73B9P>, assume we have Prefect flows with...
# ask-marvin
m
Hi @Marvin, assume we have Prefect flows with long tasks and we want to support a cancel request that stops or kills "immediately" the flow and the running tasks. What is the best way to do so? Thanks! (From my understanding, I see that the default cancel flow didn't affect the current running tasks and they continue to work. Also, when using "set_task_run_state()" or "set_flow_run_state()" it is just labeling with "cancel" not really stopping the run.)
m
Error: status_code: 529, model_name: claude-3-5-sonnet-latest, body: {'type': 'error', 'error': {'type': 'overloaded_error', 'message': 'Overloaded'}}
thought for 63.5 seconds
You're correct in your understanding - by default, canceling a flow run only marks it as "Cancelled" but doesn't actually stop the executing tasks. Here's how to implement effective cancellation: 1. Use Cancellation Handlers
Copy code
python
from prefect import flow, task
import signal

@task
def long_running_task():
    try:
        # Your long-running code here
        while True:
            # Check for cancellation
            if task.context.get("cancelled"):
                # Clean up resources
                break
            # Do work
    except Exception:
        # Handle cleanup
        raise

@flow(timeout_seconds=3600)
def my_flow():
    return long_running_task()
2. Infrastructure-Level Cancellation If you're using infrastructure like Docker or Kubernetes, you can configure your infrastructure block to support forceful cancellation:
Copy code
python
from prefect.infrastructure import DockerContainer

docker_block = DockerContainer(
    image="your-image:tag",
    auto_remove=True,
    force_pull=False,
)

@flow(infrastructure=docker_block)
def my_flow():
    # Your flow code
    pass
3. Best Practices for Cancellation: - Implement proper cleanup handlers in your tasks - Use context managers for resource management - Check for cancellation signals periodically in long-running tasks - Set appropriate timeouts on your flows and tasks - Use infrastructure that supports cancellation (Docker, Kubernetes) 4. Limitations to Be Aware Of: - Tasks running in processes or threads won't automatically stop - External resources (databases, API connections) need manual cleanup - Some infrastructure types might not support immediate cancellation - Cancellation is not guaranteed to be immediate - it depends on when the task checks for cancellation signals Here's a more complete example showing how to handle cancellation gracefully:
Copy code
python
from prefect import flow, task
import time
from contextlib import contextmanager

@contextmanager
def cleanup_on_cancel():
    try:
        yield
    finally:
        # Cleanup code here
        print("Cleaning up resources...")

@task
def cancellable_task():
    with cleanup_on_cancel():
        try:
            while True:
                if task.context.get("cancelled"):
                    print("Cancellation requested, cleaning up...")
                    break
                    
                # Your work here
                time.sleep(1)  # Don't busy-wait
                
        except Exception as e:
            print(f"Error occurred: {e}")
            raise

@flow
def main_flow():
    return cancellable_task()
For the most robust cancellation support: 1. Use infrastructure blocks that support cancellation (Docker/Kubernetes) 2. Implement cancellation checks in long-running tasks 3. Use proper resource cleanup with context managers 4. Set appropriate timeouts 5. Handle external resource cleanup explicitly You can find more details about flow cancellation in the Prefect documentation. Remember that truly immediate cancellation of Python code is challenging due to the language's cooperative multitasking nature. The best approach is to design your tasks to periodically check for cancellation signals and implement proper cleanup procedures.