Pascal Jahan Elahi (Pawsey)
01/20/2025, 8:16 AMMarvin
01/20/2025, 8:16 AMtimeout
utility. This utility helps in enforcing a timeout on the execution of a task. If the task execution exceeds the specified timeout duration, a TaskRunTimeoutError
will be raised.
Here's a concise example of how you might implement this:
python
from prefect import task, flow
from prefect.utilities.timeout import timeout
@task
def my_task():
# Your task logic here
pass
@flow
def my_flow():
with timeout(seconds=10):
my_task()
my_flow()
In this example, the timeout
context manager is used to enforce a 10-second timeout on my_task
. If my_task
exceeds this duration, it will be interrupted, and a TaskRunTimeoutError
will be raised.
For more details, you can refer to the Prefect 3.x documentation on writing tasks.
Do note that the concept of agents has been replaced by workers in Prefect 3.x, and work pools replace the infra blocks from prefect.infrastructure
. Ensure that your environment is set up accordingly.Pascal Jahan Elahi (Pawsey)
01/20/2025, 8:16 AMMarvin
01/20/2025, 8:17 AMtimeout_async
utility. This utility is designed to handle timeouts specifically for asynchronous contexts.
Here's an example of how you might implement this:
python
from prefect import task, flow
from prefect.utilities.timeout import timeout_async
@task
async def my_async_task():
# Your async task logic here
pass
@flow
async def my_async_flow():
async with timeout_async(seconds=10):
await my_async_task()
await my_async_flow()
In this example, the timeout_async
context manager is used to enforce a 10-second timeout on my_async_task
. If the task execution exceeds this duration, it will be interrupted, and a timeout exception will be raised.
For more details, you can refer to the Prefect 3.x documentation on writing tasks.Pascal Jahan Elahi (Pawsey)
01/20/2025, 8:17 AMMarvin
01/20/2025, 8:18 AM.submit
with a specific timeout in Prefect 3.x, you can use the timeout_async
utility within your task or flow. The submit
method allows you to execute tasks asynchronously, and you can handle the timeout within the task execution itself.
Here's a conceptual example of how you might implement this:
python
from prefect import task, flow
from prefect.utilities.timeout import timeout_async
@task
async def my_async_task():
# Your async task logic here
pass
@flow
async def my_async_flow():
future = my_async_task.submit()
# Use timeout_async to handle the timeout for the task execution
async with timeout_async(seconds=10):
await future.result()
await my_async_flow()
In this example, my_async_task.submit()
submits the task for asynchronous execution, and timeout_async
is used to enforce a 10-second timeout on the task's completion. If the task execution exceeds this duration, it will be interrupted.
For more details on using task runners and futures, you can refer to the Prefect 3.x documentation on task runners.Pascal Jahan Elahi (Pawsey)
01/20/2025, 8:20 AMMarvin
01/20/2025, 8:20 AMtimeout_async
exception in Prefect 3.x, you can use a try-except block within your asynchronous flow or task. The timeout_async
utility raises a TimeoutError
when the specified duration is exceeded. Here's how you can handle it:
python
from prefect import task, flow
from prefect.utilities.timeout import timeout_async
@task
async def my_async_task():
# Your async task logic here
pass
@flow
async def my_async_flow():
try:
async with timeout_async(seconds=10):
await my_async_task()
except TimeoutError as exc:
# Handle the timeout exception
print(f"Task failed due to timeout: {exc}")
await my_async_flow()
In this example, if my_async_task
exceeds the 10-second limit, the TimeoutError
is caught in the except block, allowing you to handle the timeout gracefully.Pascal Jahan Elahi (Pawsey)
01/20/2025, 2:18 PMMarvin
01/20/2025, 2:18 PM.submit
with a specific timeout and cancel the task if a timeout exception is raised in Prefect 3.x, you can use the timeout_async
utility along with the cancel
method on the future returned by submit
. Here's how you can do it:
python
from prefect import task, flow
from prefect.utilities.timeout import timeout_async
from prefect.futures import PrefectFuture
@task
async def my_async_task():
# Your async task logic here
pass
@flow
async def my_async_flow():
future: PrefectFuture = my_async_task.submit()
try:
async with timeout_async(seconds=10):
await future.result()
except TimeoutError as exc:
print(f"Task failed due to timeout: {exc}")
future.cancel() # Cancel the task if a timeout occurs
await my_async_flow()
In this example, my_async_task.submit()
submits the task for asynchronous execution. The timeout_async
utility enforces a 10-second timeout. If the task does not complete within this time, a TimeoutError
is raised, which you can catch and handle by calling future.cancel()
to cancel the task.Pascal Jahan Elahi (Pawsey)
01/20/2025, 2:19 PMMarvin
01/20/2025, 2:19 PM.submit
with a specific timeout and cancel the task if an exception is raised, you can use the timeout_seconds
parameter in the task definition. Unfortunately, the direct cancellation of tasks is not as straightforward as in Prefect 3.x, but you can manage exceptions and timeouts at the flow level.
Here's how you can set a timeout and handle exceptions:
python
from prefect import task, flow
import asyncio
@task(timeout_seconds=10)
async def my_async_task():
# Your async task logic here
await asyncio.sleep(15) # Simulate a long-running task
@flow
async def my_async_flow():
try:
await my_async_task.submit()
except Exception as exc:
print(f"Task failed due to an exception: {exc}")
# Handle task cancellation logic here if necessary
asyncio.run(my_async_flow())
In this example, the timeout_seconds
parameter is used to specify a timeout for my_async_task
. If the task exceeds this duration, a timeout exception is raised. You can catch this exception in the flow and implement any necessary cancellation logic. However, Prefect 2.x does not provide a built-in method to cancel tasks programmatically; you may need to handle such logic externally or at the flow level.
For more details on task execution and handling in Prefect 2.x, refer to the Prefect 2.x task concepts.