Esteban Vásquez Giraldo
06/07/2024, 2:43 PMMarvin
06/07/2024, 2:43 PMpersist_result
option and adjust the default behavior using settings like PREFECT_RESULTS_PERSIST_BY_DEFAULT
.
For more details, you can refer to the Prefect Documentation on Result Persistence.
### Using Cache
Cache is about temporarily storing the result of a task to speed up later executions by reusing the previous results rather than performing the task again. Here are some key points:
1. Cache Keys: A unique key is associated with each cached result. When the same task is requested to run, Prefect checks if there's a cached result with a matching key and reuses it.
2. Configuration: You can configure caching with options like:
- `cache_key_fn`: Function to compute the cache key.
- `cache_expiration`: Duration for which the cache is valid.
- `refresh_cache`: Whether to force-refresh the cache for specific task runs.
3. Storage: Cached results are kept in-memory during a flow run and typically persisted to a local storage path specified by the PREFECT_LOCAL_STORAGE_PATH
setting.
4. Use Cases: Caching is particularly useful for expensive operations that produce the same result when given the same inputs. Prefect Cloud stores only the cache key, not the data itself.
5. Example: Using cache_key_fn
and cache_expiration
to cache results of a web request task:
python
from datetime import timedelta
from prefect import flow, task
from prefect.tasks import task_input_hash
@task(cache_key_fn=task_input_hash, cache_expiration=timedelta(hours=1))
def get_url(url: str, params: dict = None):
response = httpx.get(url, params=params)
response.raise_for_status()
return response.json()
For more details, you can refer to the Prefect Documentation on Task Caching.
In summary:
- Persistence is about long-term storage and is necessary for some features like retries.
- Cache is about short-term storage to enhance efficiency for repeated operations.Esteban Vásquez Giraldo
06/07/2024, 2:46 PMMarvin
06/07/2024, 2:48 PMcache_key_fn
parameter in the @task
decorator to specify the cache key function.
3. Configure Cache Expiration: Optionally, set a cache expiration time to define how long the cache should be valid.
Here is an example:
python
from datetime import timedelta
from prefect import flow, task
from prefect.tasks import task_input_hash
# Define a cache key function that creates a unique key based on the task inputs
def unique_cache_key(context, parameters):
return f"{context.task_run.flow_run_id}-{task_input_hash(context, parameters)}"
@task(cache_key_fn=unique_cache_key, cache_expiration=timedelta(days=1))
def my_task(x):
# Simulate an expensive operation
print(f"Running task with input {x}")
return x * 2
@flow
def my_flow():
results = []
for i in range(5):
results.append(my_task(i))
return results
# Run the flow
if __name__ == "__main__":
my_flow()
# Re-running my_flow will reuse the cached results for my_task if the inputs have not changed
### Explanation:
1. Cache Key Function (unique_cache_key
): This function creates a unique cache key by combining the flow run ID and the hashed task inputs.
2. Task Configuration (my_task
): The cache_key_fn
parameter specifies the cache key function, and cache_expiration
sets the validity period of the cache.
3. Flow Definition (my_flow
): This flow runs multiple tasks, and on subsequent retries, Prefect will use the cached results of tasks that have completed successfully.
By setting up caching with a unique cache key based on task inputs, Prefect will skip re-running tasks that have already completed successfully, even when retrying the entire flow. You can adjust the cache expiration according to your needs to ensure the cache remains valid for the required duration.
For more details, refer to the Prefect Documentation on Task Caching.Esteban Vásquez Giraldo
06/07/2024, 2:50 PMMarvin
06/07/2024, 2:50 PMpython
from datetime import timedelta
from prefect import flow, task
from prefect.tasks import task_input_hash
from prefect.filesystems import LocalFileSystem
# Define a LocalFileSystem Persistent Storage
storage = LocalFileSystem(basepath="/path/to/storage")
# Define a cache key function that uses task inputs and flow run ID
def cache_key_function(context, parameters):
return f"{context.task_run.flow_run_id}-{task_input_hash(context, parameters)}"
@task(
persist_result=True, # Enable result persistence
result_storage=storage, # Specify storage location
cache_key_fn=cache_key_function, # Set cache key function
cache_expiration=timedelta(days=1) # Set cache expiration
)
def my_task(x):
# Simulate an expensive operation
print(f"Running task with input {x}")
return x * 2
@flow
def my_flow():
results = []
for i in range(5):
results.append(my_task(i))
return results
# Run the flow
if __name__ == "__main__":
my_flow()
# Re-running my_flow will reuse the cached results for my_task if the inputs have not changed
### Explanation
1. Persistent Storage (LocalFileSystem
): A persistent storage setup (LocalFileSystem
) with a specified base path. You can use other storage options (e.g., S3, Azure Blob Storage) based on your infrastructure.
2. Task Configuration (my_task
):
- `persist_result=True`: Enable result persistence for the task.
- `result_storage=storage`: Specify the storage location for persisting the results.
- `cache_key_fn=cache_key_function`: Use a cache key function to generate unique keys.
- `cache_expiration=timedelta(days=1)`: Define the cache expiration period.
With this setup, even if your container terminates, the persisted task results will remain available at the specified storage location. When the flow runs again, Prefect will use the cached results if the task inputs have not changed and the results are still valid according to the cache expiration policy.
For more details, refer to the Prefect Documentation on Result Persistence.Esteban Vásquez Giraldo
06/07/2024, 5:03 PMMarvin
06/07/2024, 5:03 PMS3
storage class. Here’s how you can modify the example to use S3 for storing task results:
### Step-by-Step Guide
1. Set up S3 Storage: Use the S3
storage class to define the storage location.
2. Set Up AWS Credentials: Ensure that your AWS credentials are set up properly.
### Example Code
Here is the modified code:
python
from datetime import timedelta
from prefect import flow, task
from prefect.tasks import task_input_hash
from prefect.filesystems import S3
# Define S3 Persistent Storage
s3_storage = S3(bucket_path="my-bucket/path/to/storage")
# Define a cache key function that uses task inputs and flow run ID
def cache_key_function(context, parameters):
return f"{context.task_run.flow_run_id}-{task_input_hash(context, parameters)}"
@task(
persist_result=True, # Enable result persistence
result_storage=s3_storage, # Specify storage location
cache_key_fn=cache_key_function, # Set cache key function
cache_expiration=timedelta(days=1) # Set cache expiration
)
def my_task(x):
# Simulate an expensive operation
print(f"Running task with input {x}")
return x * 2
@flow
def my_flow():
results = []
for i in range(5):
results.append(my_task(i))
return results
# Run the flow
if __name__ == "__main__":
my_flow()
### Explanation
1. S3 Storage (S3
): Using the S3
class, we specify the bucket_path
where task results should be stored. Make sure to replace "my-bucket/path/to/storage"
with your actual S3 bucket path.
2. Task Configuration (my_task
):
- `persist_result=True`: Enable result persistence for the task.
- `result_storage=s3_storage`: Specify the S3 storage location for persisting the results.
- `cache_key_fn=cache_key_function`: Use a cache key function to generate unique keys.
- `cache_expiration=timedelta(days=1)`: Define the cache expiration period.
### AWS Configuration
Ensure that your AWS credentials are properly configured. You can set them in the environment variables or use an AWS configuration file.
#### Example Environment Variables:
sh
export AWS_ACCESS_KEY_ID='your_access_key_id'
export AWS_SECRET_ACCESS_KEY='your_secret_access_key'
export AWS_DEFAULT_REGION='your_region'
This configuration ensures that even if your container is terminated, the task results are stored persistently in S3. When the flow runs again, Prefect will check the S3 storage for cached results and use them if the task inputs have not changed and the results are still within the cache expiration period.
For more information, refer to the Prefect Documentation on S3 Storage.