<@ULVA73B9P> I have a long running task that loops...
# ask-marvin
k
@Marvin I have a long running task that loops through a long list. Occasionally this task fails during the loop. How would you recommend I save the last entry processed so that on a retry, the task does not iterate over the records already processed in the previous task run
m
To save progress in a long-running task that loops through a list, you could implement a custom state handler to save the last entry processed before the task fails. Upon retry, the task could then read this saved state and skip already processed entries. Alternatively, you could manage periodically saved checkpoints within the task itself, manually storing the last processed entry to a persistent storage like a disk file or a database. Regrettably, the search didn't yield specific code examples related to your case, but I've found an example for implementing asynchronous tasks using Prefect, though it doesn't directly address your concerns about saving state and resuming from the last entry.
Copy code
python
import asyncio
from prefect import flow

async def some_async_function():
    await asyncio.sleep(1)
    return "I'm an async function"

@flow(log_prints=True) # send `print` statements as INFO logs
async def my_async_workflow():
    print("I can call any python code here, including prefect tasks or flows")
    await some_async_function()

if __name__ == "__main__":
    asyncio.run(my_async_workflow()) # run it like a normal async python function
You might want to look into Prefect's state handlers or the section about managing flow runs for more information. Unfortunately, I wasn’t able to locate a specific code example for your case via Prefect’s documentation or resources. If easier solutions don't emerge, let me know and we can endure the tedious work of searching GitHub issues for similar cases or concoct a solution from scratch. Heaven forbid we must resort to optimism and creativity, though I suppose even in a universe as unyieldingly vast as ours, such atrocities might occur.
k
@Marvin Perfect. Would you recommend using GCS or bigquery for this persistent storage
m
The search results suggest that Prefect does have mechanisms to work with different storage solutions, including GCS (Google Cloud Storage) and BigQuery, through the concept of storage blocks. However, whether you should use GCS or BigQuery for persistent storage largely depends on your specific use case: - Google Cloud Storage (GCS): is ideal for storing files, such as checkpoints or logs, which you want to read from or write to during the execution of tasks. It's useful when you need to save the last processed entry as a simple string or a serializable object. - BigQuery: is better suited for querying large datasets and might be overkill if you're only looking to store and retrieve simple state information. It’s also typically more involved to set up than GCS for such a use case, although it may provide more powerful data analytics capabilities if needed. If you are simply storing the progress state of the last entry your task processed before failure, GCS would be recommended owing to its simplicity and ease of use for storing and retrieving small amounts of data. You can create a file in GCS to save the latest processed record, and then retrieve and use this information to resume your task from where it left off in case of failure. For integrating with either service, you can use Prefect's Storage concept and configure it accordingly. Keep in mind that while GCS is a straightforward option for storing simple state data, BigQuery could be more appropriate if your task involves complex data analytics, and you want to leverage SQL queries on the state information. Here is the Prefect documentation on storage for further reference.
k
@Marvin will passing a hook function to the on_cancellation argument of task be executed when a task fails with this error """ Encountered exception during execution: Traceback (most recent call last): File "/usr/local/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 293, in aresult return await asyncio.wrap_future(self.future) asyncio.exceptions.CancelledError The above exception was the direct cause of the following exception: Traceback (most recent call last): File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 2099, in orchestrate_task_run result = await call.aresult() File "/usr/local/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 295, in aresult raise CancelledError() from exc prefect._internal.concurrency.cancellation.CancelledError """
m
The search results did not directly address whether a function passed to the
on_cancellation
argument of a task would be executed when a task fails due to a
CancelledError
. However, in line with the behavior of
on_cancellation
, it's designed to trigger when a task is canceled intentionally, rather than when it fails due to an exception. Typically, the
on_failure
hook is used to execute a function when a task encounters an exception and fails, whereas the
on_cancellation
hook is intended to be used when a task run is canceled (intentionally terminated before completion). Based on the traceback you provided, the error indicates that the task was canceled (via
CancelledError
). This could happen if you have logic to cancel the task under certain conditions or due to an infrastructure issue causing the cancellation. Therefore, if you have set the
on_cancellation
hook, it should be executed when the task is canceled as indicated by the error you've encountered. It is worth noting that Prefect's documentation mentions that they are working on improved cancellation handling and you can enable this experimental feature using the
PREFECT_EXPERIMENTAL_ENABLE_ENHANCED_CANCELLATION
configuration on your worker or agents. For further specifics, it might be helpful to examine the Prefect documentation related to task lifecycle hooks for understanding how
on_cancellation
behaves, as well as the section on enhanced cancellation if you face issues with the current cancellation experience.
172 Views