Piotr
07/04/2025, 8:53 PMMarvin
07/04/2025, 8:53 PMMarvin
07/04/2025, 8:55 PMpython
from prefect import task
@task(cache_key_fn=hash)
def my_task():
return "result"
2. Cache Expiration
python
from datetime import timedelta
@task(
cache_key_fn=hash,
cache_expiration=timedelta(hours=1)
)
def my_task():
return "result"
3. Custom Cache Keys
python
def custom_key_fn(context, *args, **kwargs):
return f"my-prefix-{hash(args)}-{hash(frozenset(kwargs.items()))}"
@task(cache_key_fn=custom_key_fn)
def my_task(param1, param2):
return f"{param1}-{param2}"
### Cache Policies
Prefect 3 provides several built-in cache policies:
- `TaskSource`: Cache based on the task's source code
- `Inputs`: Cache based on task input values
- `FlowParameters`: Cache based on flow parameter values
- `RunId`: Cache based on the flow run ID
### Storage Configuration
For distributed environments like yours (self-hosted with workers), you'll need to configure a shared storage location for caches to be accessible across workers. This can be done by:
1. Creating a storage block (e.g., S3, Azure, GCS)
2. Configuring the storage in your task:
python
from prefect import task
from prefect.filesystems import S3
@task(
cache_key_fn=hash,
persist_result=True,
result_storage="s3/my-bucket" # Reference to your storage block
)
def my_cached_task():
return "result"
### Important Considerations
1. Cache Refresh: You can control cache refreshing:
- Globally via PREFECT_TASKS_REFRESH_CACHE
setting
- Per task using refresh_cache=False
2. Distributed Environments: For your self-hosted setup:
- Ensure all workers can access the same storage location
- Use a consistent storage block across all workers
- Consider network latency when choosing cache expiration times
3. Cache Key Design: Choose cache keys carefully:
- They should be deterministic
- Consider including relevant parameters that affect the output
- Avoid overly specific keys that defeat the purpose of caching
4. Storage Requirements: Ensure your storage solution:
- Has sufficient capacity for your cached results
- Is accessible to all workers
- Has appropriate read/write permissions
For more detailed information, you can refer to the caching concepts documentation and the how-to guide for caching workflow steps.Piotr
07/04/2025, 8:59 PMMarvin
07/04/2025, 8:59 PMMarvin
07/04/2025, 9:00 PMcache_key_fn
- Needs persistent storage configuration for distributed environments
### Example Showing the Difference
python
from prefect import flow, task
@task
def task1():
return "result1"
@task
def task2(upstream_result):
# This task fails
raise ValueError("Something went wrong")
@flow
def my_flow():
result = task1()
task2(result) # This fails
# When this flow runs:
# 1. task1 executes and its result is stored in memory
# 2. task2 fails
# 3. On retry, task1 won't re-execute because its result is persisted
# 4. task2 will retry with task1's stored result
Versus caching:
python
from prefect import flow, task
@task(cache_key_fn=hash) # Explicitly enable caching
def task1():
return "result1"
@task
def task2(upstream_result):
return "result2"
@flow
def my_flow():
result = task1()
task2(result)
# Now:
# - If you run this flow multiple times
# - task1 will only execute once
# - Subsequent runs will use the cached result
### Long-term Persistence
If you want results to persist beyond a single flow run, you have two options:
1. Enable global result persistence:
python
# In your Prefect configuration
PREFECT_RESULTS_PERSIST_BY_DEFAULT=true
2. Configure persistence per task:
python
@task(persist_result=True)
def my_task():
return "result"
This distinction between result persistence and caching is important for understanding how Prefect handles task results and retries. The behavior you're seeing with retries is a feature of Prefect's result persistence system, which is different from its caching system.
For more details, you can check out the Results documentation and Caching concepts.Piotr
07/04/2025, 9:02 PMMarvin
07/04/2025, 9:02 PMMarvin
07/04/2025, 9:03 PMbash
PREFECT_RESULTS_PERSIST_BY_DEFAULT=true
2. Per task:
python
@task(persist_result=True)
def my_task():
return "result"
### Storage Location
When persistence is enabled, results are stored in:
- Default: ~/.prefect/storage/
- Can be configured to use other storage blocks (S3, GCS, etc.)
- For your self-hosted setup, you should configure a shared storage location accessible to all workers
### Serialization Requirements
For custom objects to be persisted, they need to be serializable. Prefect supports two main serialization methods:
1. JSON Serialization (Default)
- Objects must be JSON-serializable
- Custom objects need to implement one of:
python
# Option 1: to_dict() method
class MyObject:
def to_dict(self):
return {"key": "value"}
# Option 2: JSON serialization methods
class MyObject:
def __json__(self):
return {"key": "value"}
2. Pickle Serialization
- More flexible but less secure
- Custom objects need to implement:
python
class MyObject:
def __getstate__(self):
return self.__dict__
def __setstate__(self, state):
self.__dict__.update(state)
### Example with Custom Object
python
from prefect import task, flow
class MyCustomObject:
def __init__(self, value):
self.value = value
def to_dict(self):
return {"value": self.value}
@classmethod
def from_dict(cls, data):
return cls(data["value"])
@task(persist_result=True)
def create_custom_object():
return MyCustomObject("test")
@flow
def my_flow():
result = create_custom_object()
return result
### Best Practices
1. Storage Configuration
- For distributed environments (like your self-hosted setup), configure a shared storage block:
python
from prefect.filesystems import S3
@task(
persist_result=True,
result_storage="s3/my-bucket" # Reference to your storage block
)
def my_task():
return MyCustomObject("test")
2. Serialization
- Keep objects simple and serializable
- Use built-in Python types when possible
- Implement proper serialization methods for custom objects
- Consider using JSON serialization for better interoperability
3. Performance
- Only persist results when necessary
- Be mindful of storage space when persisting large objects
- Consider cleanup strategies for old results
For more detailed information, you can refer to the Results documentation and the API reference for results.Piotr
07/04/2025, 9:08 PMMarvin
07/04/2025, 9:08 PMMarvin
07/04/2025, 9:11 PMpython
@flow
def my_flow():
result1 = task1() # Stored in worker memory
result2 = task2(result1) # result1 is accessed from worker memory
- All intermediate results stay in the worker's RAM
- Results are available for downstream tasks within the same flow run
- The worker manages the entire flow execution context
2. Server Role:
- Receives task state updates (success, failure, etc.)
- Stores metadata about task runs
- Does NOT store the actual task results
- Coordinates worker activities and flow scheduling
### Important Implications
1. Memory Management:
- Large task results can impact worker memory usage
- Workers need sufficient RAM to handle all results in a flow run
- Memory is freed when the flow run completes
2. Worker Failures:
- If a worker fails during execution, in-memory results are lost
- This is why persistence is important for production workflows
- Persisted results can be recovered even if a worker fails
3. Multiple Workers:
- Each worker maintains its own memory space
- Results from one worker are not automatically available to other workers
- For sharing results between workers, you need to enable persistence with shared storage
### Best Practices
1. Memory Considerations:
python
@flow
def memory_conscious_flow():
# Be careful with large intermediate results
big_result = task_with_large_output()
process_result(big_result)
del big_result # Clear from memory if no longer needed
2. Production Setups:
- Enable result persistence for critical workflows
- Configure appropriate storage blocks for result persistence
- Monitor worker memory usage
- Scale worker resources based on your workflow's memory requirements
3. Worker Configuration:
yaml
# Example worker Helm values
worker:
resources:
requests:
memory: "1Gi"
limits:
memory: "2Gi"
- Ensure workers have sufficient memory allocation
- Consider workflow memory requirements when sizing workers
This architecture means that task results are managed locally by the executing worker, which is more efficient than sending large results back and forth to the server. The server focuses on orchestration and coordination rather than result storage.Piotr
07/04/2025, 9:34 PMMarvin
07/04/2025, 9:34 PMMarvin
07/04/2025, 9:35 PMpython
@flow
def my_flow():
result1 = task1() # Result stored in flow run pod's RAM
result2 = task2() # Fails
# Pod stays alive, result1 is still in memory
# When task2 retries, it can still access result1
When a task fails:
1. The flow run pod remains active
2. All previous task results are preserved in the pod's memory
3. The retry occurs in the same pod
4. Previous task results are immediately available for the retry
### Memory Management
yaml
# Example Kubernetes flow run pod configuration
kind: Pod
spec:
containers:
- name: flow-run
resources:
requests:
memory: "1Gi"
limits:
memory: "2Gi"
- Memory needs to be sized for the entire flow's lifecycle
- All intermediate results stay in the pod's memory
- Memory is only freed when the flow run completes or fails permanently
### Best Practices for K8s Deployments
1. Resource Planning:
- Size pod memory based on your flow's total memory needs
- Consider all intermediate results that need to be held
- Account for parallel task execution if applicable
2. Failure Handling:
python
@task(retries=3)
def my_task():
# Task results stay in pod memory during retries
return "result"
3. Long-Running Flows:
- Be mindful of pod memory usage over time
- Consider clearing large intermediate results when no longer needed
- Use persistence for critical results that shouldn't be lost if the pod fails
4. Pod Lifecycle:
- Pods remain until flow completion or permanent failure
- Configure appropriate timeouts in your Kubernetes deployment
- Monitor pod resources during long-running flows
This model is more efficient than creating separate pods for each task, as it:
- Reduces overhead
- Maintains state more effectively
- Provides faster access to intermediate results
- Simplifies retry handling
Thank you for the correction - it's important to understand that the worker is primarily an orchestrator, while the actual execution and result storage happens within the flow run pod itself.Piotr
07/04/2025, 9:44 PMMarvin
07/04/2025, 9:44 PMMarvin
07/04/2025, 9:45 PMjson
{
"job_configuration": {
"spec": {
"template": {
"spec": {
"containers": [
{
"name": "flow",
"resources": {
"requests": {
"cpu": "100m",
"memory": "512Mi"
},
"limits": {
"cpu": "1000m",
"memory": "1Gi"
}
}
}
]
}
}
}
}
}
2. Via Python:
python
from prefect.infrastructure import KubernetesJob
job = KubernetesJob(
job_configuration={
"spec": {
"template": {
"spec": {
"containers": [{
"name": "flow",
"resources": {
"requests": {
"cpu": "100m",
"memory": "512Mi"
},
"limits": {
"cpu": "1000m",
"memory": "1Gi"
}
}
}]
}
}
}
}
)
### Default Values
- There are no default resource limits or requests set by Prefect
- If not specified, Kubernetes will:
- Run pods without resource limits
- Set requests to zero
- Allow pods to consume any available resources on the node
### Best Practices
1. Always Set Both Requests and Limits:
json
{
"requests": {
"cpu": "100m", // Request 0.1 CPU cores
"memory": "512Mi" // Request 512 MiB memory
},
"limits": {
"cpu": "1000m", // Limit to 1 CPU core
"memory": "1Gi" // Limit to 1 GiB memory
}
}
2. Consider Your Workflow Needs:
- Memory-intensive tasks need higher memory limits
- CPU-intensive tasks need higher CPU limits
- Set requests based on typical usage
- Set limits based on maximum expected usage
3. Monitor Resource Usage:
- Start with conservative limits
- Monitor actual usage
- Adjust based on observed patterns
- Consider using vertical pod autoscaling
4. Environment-Specific Settings:
python
from prefect import flow
from prefect.infrastructure import KubernetesJob
def get_resource_config(env):
if env == "production":
return {
"requests": {"cpu": "500m", "memory": "1Gi"},
"limits": {"cpu": "2000m", "memory": "2Gi"}
}
return {
"requests": {"cpu": "100m", "memory": "512Mi"},
"limits": {"cpu": "500m", "memory": "1Gi"}
}
@flow(infrastructure=KubernetesJob(
job_configuration={
"spec": {
"template": {
"spec": {
"containers": [{
"name": "flow",
"resources": get_resource_config("production")
}]
}
}
}
}
))
def my_flow():
pass
Remember that these settings affect:
- How Kubernetes schedules your pods
- Resource allocation on nodes
- Pod eviction behavior
- Overall cluster resource management
For your self-hosted setup, make sure to align these settings with your cluster's capacity and other workloads' requirements.