Jeffrey Macks
06/30/2025, 7:32 PM@flow(
timeout_seconds=5,
retries=0,
retry_delay_seconds=0,
)
Marvin
06/30/2025, 7:32 PMMarvin
06/30/2025, 7:33 PMtimeout_seconds
parameter is meant to cancel all running tasks when the flow exceeds its timeout. However, there are some known limitations:
2. The inconsistent behavior you're seeing (some tasks canceling, others continuing) can occur due to a few factors:
- Tasks that are already executing might not respond immediately to cancellation signals
- Some task runners (like Ray) may not properly enforce timeouts
- Tasks running in separate processes or distributed environments might not receive the cancellation signal properly
To improve this situation, here are some recommendations:
1. Instead of relying solely on flow-level timeouts, implement task-level timeouts with proper error handling:
python
from prefect import flow, task
from prefect.exceptions import TaskTimeout
@task(timeout_seconds=2)
def my_task():
# Task logic here
pass
@flow(timeout_seconds=5)
def my_flow():
try:
results = my_task()
except TaskTimeout:
# Handle timeout gracefully
pass
2. For critical operations that must be cancelled cleanly, consider implementing your own timeout mechanism using asyncio or threading timeouts within the task itself.
3. If you need to ensure proper cleanup, consider using flow-level exception handlers to perform cleanup operations when timeouts occur:
python
@flow(timeout_seconds=5)
def my_flow():
try:
# Flow logic here
pass
except TimeoutError:
# Perform cleanup operations
pass
finally:
# Always execute cleanup code
pass
4. If you're using a specific task runner (like Ray), check if there are known issues with timeout handling for that runner.
This is a known area where behavior can be inconsistent, particularly in distributed environments. If you need guaranteed timeout behavior, you might need to implement additional safeguards at the task level or within your task code itself.
Would you be able to share what type of task runner you're using (if any) and the nature of the tasks that aren't properly canceling? This would help provide more specific guidance for your use case.Jeffrey Macks
06/30/2025, 7:40 PM@task(retries=0)
async def wait_awhile(
seconds: int,
logger: logging.Logger | logging.LoggerAdapter[Any],
):
<http://logger.info|logger.info>(f"Waiting for {seconds} seconds...")
await asyncio.sleep(seconds)
<http://logger.info|logger.info>("Wait completed.")
here is an example of a task that does not cancel or only cancels after running multiple times:
for name in names:
task(process_names).with_options(
task_run_name=f"processed_name_{_name}",
persist_result=True,
retries=0,
timeout_seconds=5,
)(
logger=logger,
)
Jeffrey Macks
06/30/2025, 7:40 PMJeffrey Macks
06/30/2025, 7:41 PM@task(retries=0)
async def wait_awhile(
seconds: int,
logger: logging.Logger | logging.LoggerAdapter[Any],
):
<http://logger.info|logger.info>(f"Waiting for {seconds} seconds...")
await asyncio.sleep(seconds)
<http://logger.info|logger.info>("Wait completed.")
here is an example of a task that does not cancel or only cancels after running multiple times:
for name in names:
task(process_names).with_options(
task_run_name=f"processed_name_{_name}",
persist_result=True,
retries=0,
timeout_seconds=5,
)(
logger=logger,
)
Marvin
06/30/2025, 7:41 PMMarvin
06/30/2025, 7:43 PMpython
@task(retries=0)
async def process_names(name: str, logger: logging.Logger):
<http://logger.info|logger.info>(f"Processing name: {name}")
# Your processing logic here
return result
2. Then, modify your flow to handle multiple tasks correctly:
python
@flow(timeout_seconds=5)
async def my_flow(names: List[str]):
logger = get_run_logger()
# Option 1: Using asyncio.gather for concurrent execution
tasks = [
process_names.with_options(
name=f"processed_name_{name}",
persist_result=True,
retries=0,
timeout_seconds=5,
)(name=name, logger=logger)
for name in names
]
# This will properly handle cancellation
results = await asyncio.gather(*tasks, return_exceptions=True)
# Option 2: If you need sequential execution
for name in names:
try:
await process_names.with_options(
name=f"processed_name_{name}",
persist_result=True,
retries=0,
timeout_seconds=5,
)(name=name, logger=logger)
except Exception as e:
logger.error(f"Error processing {name}: {e}")
Key improvements in this approach:
1. Use async/await
consistently throughout your code
2. Properly handle task submission and awaiting
3. Use exception handling to catch and handle timeouts gracefully
4. Use asyncio.gather
if you want concurrent execution with proper cancellation handling
The reason your first example (wait_awhile
) works better is that it's using proper async primitives that can respond to cancellation signals. The second example might be creating tasks in a way that doesn't properly integrate with Prefect's cancellation mechanisms.
Also, ensure that any IO operations or long-running processes within your tasks are themselves cancellation-aware. If you're doing CPU-bound work or using blocking IO, you might need to add periodic cancellation checks:
python
@task(retries=0)
async def process_names(name: str, logger: logging.Logger):
<http://logger.info|logger.info>(f"Processing name: {name}")
# If doing CPU-bound work, periodically check for cancellation
for chunk in work_chunks:
# Periodically yield control back to the event loop
await asyncio.sleep(0)
process_chunk(chunk)
return result
This pattern will ensure that your tasks can properly respond to cancellation signals from the parent flow's timeout.Jeffrey Macks
06/30/2025, 8:03 PMMarvin
06/30/2025, 8:03 PMMarvin
06/30/2025, 8:07 PMpython
@flow
async def my_flow(names: List[str]):
logger = get_run_logger()
tasks = [
process_names.with_options(
name=f"processed_name_{name}",
retries=0,
timeout_seconds=5,
)(name=name, logger=logger)
for name in names
]
# Will wait for all tasks and collect any errors
results = await asyncio.gather(*tasks, return_exceptions=True)
# Check for any failures
for name, result in zip(names, results):
if isinstance(result, Exception):
logger.error(f"Failed to process {name}: {result}")
2. If you want to process as many as possible and don't need to wait for all:
python
@flow
async def my_flow(names: List[str]):
logger = get_run_logger()
async with asyncio.TaskGroup() as tg:
for name in names:
tg.create_task(
process_names.with_options(
name=f"processed_name_{name}",
retries=0,
timeout_seconds=5,
)(name=name, logger=logger)
)
# TaskGroup will wait for all tasks to complete or fail
3. If you need to control concurrency (recommended for storage writes):
python
from prefect.tasks import task_input_hash
from datetime import timedelta
@flow
async def my_flow(names: List[str]):
logger = get_run_logger()
# Use task groups with concurrency limits
async with asyncio.TaskGroup() as tg:
# Process in chunks to limit concurrent storage writes
chunk_size = 5 # Adjust based on your storage system's capacity
for i in range(0, len(names), chunk_size):
chunk = names[i:i + chunk_size]
tasks = [
process_names.with_options(
name=f"processed_name_{name}",
retries=0,
timeout_seconds=5,
# Add cache key if needed to prevent duplicate writes
cache_key_fn=task_input_hash,
cache_expiration=timedelta(hours=1),
)(name=name, logger=logger)
for name in chunk
]
# Process each chunk concurrently
await asyncio.gather(*tasks, return_exceptions=True)
4. If you need to ensure atomic writes or handle storage conflicts:
python
from prefect.blocks.system import String
from prefect import task
@task(retries=0)
async def process_names(name: str, logger: logging.Logger):
# Use Prefect blocks for atomic storage operations
storage_block = await String.load("your-storage-block")
try:
# Your storage write logic here
await storage_block.set(f"processed_{name}", "some_value")
<http://logger.info|logger.info>(f"Successfully wrote {name} to storage")
except Exception as e:
logger.error(f"Failed to write {name}: {e}")
raise
@flow
async def my_flow(names: List[str]):
tasks = [process_names(name) for name in names]
await asyncio.gather(*tasks, return_exceptions=True)
Key considerations for storage writes:
1. Concurrency Control: Be careful about how many concurrent writes you attempt. Storage systems often have limits.
2. Error Handling: Storage operations can fail, so proper error handling is crucial.
3. Atomicity: If you need atomic operations, consider using Prefect blocks or appropriate locking mechanisms.
4. Idempotency: Consider adding cache keys to prevent duplicate writes if your flow reruns.
The choice between these patterns depends on your specific requirements:
- How important is it to process all items?
- What happens if some writes fail?
- Does order matter?
- What are the concurrency limits of your storage system