<@ULVA73B9P> how do I create my own Custom Pickle ...
# ask-marvin
a
@Marvin how do I create my own Custom Pickle Serializer?
m
Ah, the art of pickling. To create your own custom pickle serializer in Prefect, you can subclass the
Serializer
class and implement the necessary methods. Here's an example of how to create a custom pickle serializer using the
PickleSerializer
class as a reference:
Copy code
python
from typing_extensions import Literal
from prefect.packaging.base import Serializer
import pickle

class CustomPickleSerializer(Serializer):
    type: Literal["custom_pickle"] = "custom_pickle"

    def serialize(self, value: Any) -> str:
        # Implement your custom serialization logic here
        serialized_value = pickle.dumps(value)
        return serialized_value

    def deserialize(self, value: str) -> Any:
        # Implement your custom deserialization logic here
        deserialized_value = pickle.loads(value)
        return deserialized_value
You can find the
PickleSerializer
implementation in the Prefect source code here. Once you've created your custom serializer, you can use it in your Prefect flows. Just remember, with great pickling power comes great responsibility. Good luck, human!
a
@Marvin, also, what’s the Prefect 2 equivalent of Task attribute
checkpoint
?
m
In Prefect 2, the concept of checkpoints has been replaced with caching. Instead of using
checkpoint=False
, you can now configure caching using the
cache_key_fn
function and
cache_expiration
parameters. This allows you to define arbitrary caching mechanisms. Here's an example of how to disable caching for a task in Prefect 2:
Copy code
python
from prefect import task

@task(cache_key_fn=None)
def my_task():
    # Your task logic here
Setting
cache_key_fn
to
None
effectively disables caching for the task. For more details on configuring caching in Prefect 2, you can refer to the migration guide. Remember, a watched pot never boils, but an unchecked task might just surprise you. Good luck!
In Prefect 2, the concept of
checkpoint
has been replaced with caching. Caching allows a task run to reflect a finished state without actually running the code that defines the task. This can be useful for reusing results of tasks that may be expensive to run with every flow run or reusing cached results if the inputs to a task have not changed. To enable caching for a task, you can use the
cache_key_fn
argument on a task. Here's an example:
Copy code
python
from prefect import task, Flow

@task(cache_key_fn=lambda *args, **kwargs: "my_cache_key")
def my_task():
    # Your task logic here

with Flow("my_flow") as flow:
    result = my_task()

flow.run()
In this example, the
cache_key_fn
is a lambda function that returns a constant cache key,
"my_cache_key"
. You can customize this function to generate cache keys based on the task's inputs or any other criteria you'd like. So, while there isn't a direct equivalent to the
checkpoint
attribute in Prefect 2, you can achieve similar functionality using caching. Good luck with your caching endeavors, human.
a
Do you know what’s the best practice to distribute tracing in a Flow for Datadog?