Mohammad Nassar
07/01/2025, 9:07 AMMarvin
07/01/2025, 9:07 AMMarvin
07/01/2025, 9:10 AMMarvin
07/01/2025, 9:11 AMpython
from prefect import flow, task
import signal
@task
def long_running_task():
try:
# Your long-running code here
while True:
# Check for cancellation
if task.context.get("cancelled"):
# Clean up resources
break
# Do work
except Exception:
# Handle cleanup
raise
@flow(timeout_seconds=3600)
def my_flow():
return long_running_task()
2. Infrastructure-Level Cancellation
If you're using infrastructure like Docker or Kubernetes, you can configure your infrastructure block to support forceful cancellation:
python
from prefect.infrastructure import DockerContainer
docker_block = DockerContainer(
image="your-image:tag",
auto_remove=True,
force_pull=False,
)
@flow(infrastructure=docker_block)
def my_flow():
# Your flow code
pass
3. Best Practices for Cancellation:
- Implement proper cleanup handlers in your tasks
- Use context managers for resource management
- Check for cancellation signals periodically in long-running tasks
- Set appropriate timeouts on your flows and tasks
- Use infrastructure that supports cancellation (Docker, Kubernetes)
4. Limitations to Be Aware Of:
- Tasks running in processes or threads won't automatically stop
- External resources (databases, API connections) need manual cleanup
- Some infrastructure types might not support immediate cancellation
- Cancellation is not guaranteed to be immediate - it depends on when the task checks for cancellation signals
Here's a more complete example showing how to handle cancellation gracefully:
python
from prefect import flow, task
import time
from contextlib import contextmanager
@contextmanager
def cleanup_on_cancel():
try:
yield
finally:
# Cleanup code here
print("Cleaning up resources...")
@task
def cancellable_task():
with cleanup_on_cancel():
try:
while True:
if task.context.get("cancelled"):
print("Cancellation requested, cleaning up...")
break
# Your work here
time.sleep(1) # Don't busy-wait
except Exception as e:
print(f"Error occurred: {e}")
raise
@flow
def main_flow():
return cancellable_task()
For the most robust cancellation support:
1. Use infrastructure blocks that support cancellation (Docker/Kubernetes)
2. Implement cancellation checks in long-running tasks
3. Use proper resource cleanup with context managers
4. Set appropriate timeouts
5. Handle external resource cleanup explicitly
You can find more details about flow cancellation in the Prefect documentation.
Remember that truly immediate cancellation of Python code is challenging due to the language's cooperative multitasking nature. The best approach is to design your tasks to periodically check for cancellation signals and implement proper cleanup procedures.