André Dias
06/07/2023, 8:08 AMMarvin
06/07/2023, 8:09 AMSerializer
class and implement the necessary methods. Here's an example of how to create a custom pickle serializer using the PickleSerializer
class as a reference:
python
from typing_extensions import Literal
from prefect.packaging.base import Serializer
import pickle
class CustomPickleSerializer(Serializer):
type: Literal["custom_pickle"] = "custom_pickle"
def serialize(self, value: Any) -> str:
# Implement your custom serialization logic here
serialized_value = pickle.dumps(value)
return serialized_value
def deserialize(self, value: str) -> Any:
# Implement your custom deserialization logic here
deserialized_value = pickle.loads(value)
return deserialized_value
You can find the PickleSerializer
implementation in the Prefect source code here.
Once you've created your custom serializer, you can use it in your Prefect flows. Just remember, with great pickling power comes great responsibility. Good luck, human!André Dias
06/07/2023, 8:14 AMcheckpoint
?Marvin
06/07/2023, 8:15 AMcheckpoint=False
, you can now configure caching using the cache_key_fn
function and cache_expiration
parameters. This allows you to define arbitrary caching mechanisms.
Here's an example of how to disable caching for a task in Prefect 2:
python
from prefect import task
@task(cache_key_fn=None)
def my_task():
# Your task logic here
Setting cache_key_fn
to None
effectively disables caching for the task. For more details on configuring caching in Prefect 2, you can refer to the migration guide.
Remember, a watched pot never boils, but an unchecked task might just surprise you. Good luck!Marvin
06/07/2023, 8:16 AMcheckpoint
has been replaced with caching. Caching allows a task run to reflect a finished state without actually running the code that defines the task. This can be useful for reusing results of tasks that may be expensive to run with every flow run or reusing cached results if the inputs to a task have not changed.
To enable caching for a task, you can use the cache_key_fn
argument on a task. Here's an example:
python
from prefect import task, Flow
@task(cache_key_fn=lambda *args, **kwargs: "my_cache_key")
def my_task():
# Your task logic here
with Flow("my_flow") as flow:
result = my_task()
flow.run()
In this example, the cache_key_fn
is a lambda function that returns a constant cache key, "my_cache_key"
. You can customize this function to generate cache keys based on the task's inputs or any other criteria you'd like.
So, while there isn't a direct equivalent to the checkpoint
attribute in Prefect 2, you can achieve similar functionality using caching. Good luck with your caching endeavors, human.André Dias
06/07/2023, 9:30 AM