https://prefect.io logo
Title
m

Marco Barbero Mota

05/18/2023, 7:07 PM
@Marvin what does prefect save as a persitent result, is it just teh variable that a task returns in the return statement or is there more information in the pickle file that is saved?
m

Marvin

05/18/2023, 7:09 PM
Ah, the persistent result pickle conundrum. Let's unravel this enigma. In Prefect, the result is persisted to a storage location in your infrastructure, and Prefect stores a reference to the result. The Prefect API does not store your results in most cases, except for a few data types like booleans and nulls, which are stored directly in the database without persistence to storage. When a result is persisted, it is saved to the configured storage location, such as a local file system or cloud storage, depending on your setup. The actual content of the persisted result depends on the task's output and the serializer used. The default serializer is the
PickleSerializer
, which serializes the output of the task. If you use a custom serializer, the content of the persisted result may vary. In summary, the primary content of the persisted result is the output of the task, but the exact format and additional information depend on the serializer used. Now that we've cracked the code, let's get back to orchestrating workflows, shall we? Humans and their fascination with pickles...
n

Nate

05/18/2023, 7:10 PM
this looks largely correct πŸ‘
does that answer your question?
m

Marco Barbero Mota

05/18/2023, 7:11 PM
So if a task resturns a dictionary, and in a separate file I want to access that dictionary can I access it from the file prefect saved?
@Nate @Marvin Witnin the same task I had a
pickle.dump
so that the exact variable (dictionary in this case) was also saved to a local file as I normally do. The file that prefect saves is larger and when I try to
pickle.load
it it cant be loaded. However, I can do it with the manually saved file.
n

Nate

05/18/2023, 7:14 PM
you should be able to save the result with a
result_storage_key
(docs) so you have a reference in the future, then calling
.get()
on the result should deserialize it for you can you show your code and what's going wrong?
m

Marvin

05/18/2023, 7:15 PM
It appears that there are some issues related to Prefect-generated pickle files and loading them. One issue is related to memory build-up when using result persistence and the default serializer (
PickleSerializer
) (GitHub issue #7624). The issue reports that the serialization objects from each object remain in memory, leading to a shift in the buildup. Another issue is related to an error when aborting or crashing, which states "Got unsupported data document encoding of 'cloudpickle'. Expected 'result'." (GitHub issue #6845). This issue might be related to the problem you're experiencing when trying to load the Prefect-generated pickle file. Unfortunately, I couldn't find a direct solution to your problem. However, you might want to consider using a custom serializer for your task results to avoid potential issues with the default
PickleSerializer
. This way, you can have more control over the serialization and deserialization process, ensuring compatibility with your manual loading process. Remember, life is full of mysteries, and sometimes, so are pickle files.
Ah, the curious case of the bloated pickle. Let's investigate this discrepancy. Prefect's result handling might be adding some extra information or wrapping the result in a specific format. To understand the difference between the manually saved file and the one saved by Prefect, we'll need to dig a little deeper into the Prefect documentation and see if there's any information about this behavior. Allow me to perform a search.
m

Marco Barbero Mota

05/18/2023, 7:18 PM
@Nate This is the task I ran. This just runs in
@task(cache_key_fn=cache_file_based_fn,
      persist_result=True, 
      result_storage=LocalFileSystem(basepath='~/results'), 
      result_storage_key='results_A.pkl',
      result_serializer="pickle",
      cache_result_in_memory=False)
def my_task():
    result = {
        'a':np.array([1,1]),
        'b':np.array[2,2]()
    }

    with open('~/results/results_B.pkl', 'wb') as file:
    # Serialize and save the dictionary to the pickle file
        pickle.dump(result, file)
    
    return result

@flow  
def pipeline():
   my_task()

pipeline()
@Nate In a separate file and at a later time I can use
load.pickle()
to retrieve the
result
dictionary when I do so for
~/results/result_B.pkl
however when doing so for
~/results/result_A
there is an error.
n

Nate

05/18/2023, 7:23 PM
what error are you getting when you try to access
result_A
?
and how are you trying to access it?
m

Marco Barbero Mota

05/18/2023, 7:24 PM
import pickle 

with open('~/results/result_A.pkl',"rb") as file:
    object = pickle.load(file)
@Nate And the error is
UnpicklingError: invalid load key '{'
n

Nate

05/18/2023, 7:33 PM
ah, I think
result_serializer
has to be a
Serializer
type, not a string since the string "pickle" is not a serializer, its probably defaulting to the JSONSerializer since your
result
is JSON
try setting
result_serializer=PickleSerializer()
instead, where
from prefect.serializers import PickleSerializer
m

Marco Barbero Mota

05/18/2023, 7:39 PM
@Nate I have tried that, and same result. Also tried a custom serializer. Using string β€œpickle” is the same as definint
PickleSerializer
.It is the Literal string defined in that class.
n

Nate

05/18/2023, 7:52 PM
ahh i see what's happening
In [17]: @task(persist_result=True, result_storage_key="test.pkl")
    ...: def test_task():
    ...:     return {"a": 1}
    ...:

In [18]: @flow
    ...: def testing():
    ...:     test_task()
    ...:

In [19]: testing()

Out[19]: [Completed(message=None, type=COMPLETED, result=PersistedResult(type='reference', artifact_type='result', artifact_description='Result of type `dict` persisted to: `/Users/nate/.prefect/storage/test.pkl`', serializer_type='pickle', storage_block_id=UUID('1ea3ffa6-d603-44f5-af99-223b108f266a'), storage_key='test.pkl'))]

In [20]: !cat /Users/nate/.prefect/storage/test.pkl
{"serializer": {"type": "pickle", "picklelib": "cloudpickle", "picklelib_version": "2.2.1"}, "data": "gAWVCgAAAAAAAAB9lIwBYZRLAXMu\n", "prefect_version": "2.10.9"}

In [27]: import json
    ...:
    ...: with open("/Users/nate/.prefect/storage/test.pkl", 'r') as f:
    ...:     print(json.loads(f.read())["data"])
    ...:
gAWVCgAAAAAAAAB9lIwBYZRLAXMu
it seems like the pickle serializer is writing JSON with the pickled data stored inside
m

Marco Barbero Mota

05/18/2023, 7:57 PM
@Nate Yes I saw this, and I was trying to figure out how to access just the data and not the metadata
@Nate I figured the metadata is used by prefect when it loads the file during caching.
@Nate So I need to load the file with json? How can I retrieve the object?
@Nate Or better, how can I write a custom serializer that does not write json files but pickle files that I can load?
n

Nate

05/18/2023, 8:10 PM
understood! let me dig into this for you
m

Marco Barbero Mota

05/18/2023, 8:14 PM
Awesome thanks a lot. Really apprecite it! I have tried many approaches so far and need this functionality.
n

Nate

05/18/2023, 8:30 PM
ok i think i got it, might be nice to make a util for this
In [51]: import base64
         from pathlib import Path
         from prefect.results import PersistedResultBlob
         
In [52]: pickle.loads(
            base64.b64decode(
               PersistedResultBlob.parse_raw(
                  Path("~/.prefect/storage/test.pkl").read_bytes()
               ).data
            )
         )
Out[52]: {'a': 1}
πŸ™Œ 1
m

Marco Barbero Mota

05/18/2023, 8:31 PM
wow, you are the best!
Let me try it out
It would be good if for future versions of prefect there is no need of this conundrum
the main reason I started using prefect was because before i was manually saving the results of each task and being careful to not override or rerun already completed ones
but still having access to each task result
n

Nate

05/18/2023, 8:36 PM
yeah I agree that this could be easier - would you be willing to open an issue (tagged
enchancement
) describing the complexity you ran into?
m

Marco Barbero Mota

05/18/2023, 8:36 PM
sure
never done so
could you kindly show me the steps to do so? I have read many in GitHub but never opened one myself.
n

Nate

05/18/2023, 8:37 PM
I quickly searched the issues "fetch result" and "result" to make sure there isn't an issue for this already
and then you can choose
πŸš€ Propose a feature enhancement
and fill out the form! it would be helpful to show what you tried, what didn't work for you, and then what you had to do in order to make it work, and explain why it could have been easier. If you have any suggestions on implementation, you can put them in the
Describe the proposed behavior
section πŸ™‚
m

Marco Barbero Mota

05/18/2023, 8:44 PM
Awesome! Your code works by the way! You deserve a raise lol I am writing the issue right now.
I am hopeful that there is something easier that can be done but for now this should do
n

Nate

05/18/2023, 8:48 PM
glad it works for you! thanks so much for opening the issue πŸ™‚ we're doing a polish sprint now so hopefully we can get a convenience method for that soon
m

Marco Barbero Mota

05/18/2023, 9:38 PM
Just submitted it!
πŸš€ 1
n

Nate

05/18/2023, 9:38 PM
awesome - thanks so much!
m

Marco Barbero Mota

05/18/2023, 9:40 PM
I mention a github repo
I am workign on it and I will make sure to mention your contribution πŸ‘
Do you have a github profile?
to reference it ?
n

Nate

05/18/2023, 9:43 PM
i commented on the issue with the snippet i sent above!