Paul Gierz
10/09/2024, 8:50 AMtask
, and passes data from one to the next. Transformations on the data
are defined by an object called Rule
, which has some attributes. Think of it as a dictionary, more or less) So far, I have tasks that are decorated like this:
@task(cache_expiration=timedelta(days=1), cache_policy=TASK_SOURCE + INPUTS)
def my_function(data, rule_spec):
... # do something with data, based upon what is defined in the Rule object
return data
This works great, and I have the following setup in my run script:
#!/bin/bash -e
export PREFECT_SERVER_API_HOST=0.0.0.0
export PREFECT_LOCAL_STORAGE_PATH="./task-cache/"
prefect server start &
time python my_workflow.py
I get something in the expected location:
$ ls -lah ./task-cache/
total 5.0G
drwxr-sr-x 2 a270077 ab0246 4.0K Oct 9 10:03 .
drwxr-sr-x 5 a270077 ab0246 84K Oct 9 10:02 ..
-rw-r--r-- 1 a270077 ab0246 327 Oct 9 10:03 1d33fdca8b1fb0b2f599d4a5f5baeabb
-rw-r--r-- 1 a270077 ab0246 15M Oct 9 10:05 2772efda3019a1630f1da30c53480b1c
-rw-r--r-- 1 a270077 ab0246 9.4M Oct 9 10:05 33f19ada24e79b8ba27a4507b20c903d
-rw-r--r-- 1 a270077 ab0246 2.5G Oct 9 10:03 6c66febdf523c0a30897ed4ee57edf7d
-rw-r--r-- 1 a270077 ab0246 2.5M Oct 9 10:05 9a5ff13448c0e86425d82e0855557725
-rw-r--r-- 1 a270077 ab0246 2.5G Oct 9 10:03 bf43304fee052e08aed9ef0402fe2791
-rw-r--r-- 1 a270077 ab0246 6.4K Oct 9 09:57 da6c58da3aa841659f1aced8e7dbc535
-rw-r--r-- 1 a270077 ab0246 2.5M Oct 9 10:05 e96bf03d4145ed875c25824a29cd8cc7
And, these things look like json:
$ jq . ./task-cache/1d33fdca8b1fb0b2f599d4a5f5baeabb
{
"metadata": {
"storage_key": "/work/ab0246/a270077/SciComp/Projects/pymorize/examples/task-cache/1d33fdca8b1fb0b2f599d4a5f5baeabb",
"expiration": "2024-10-10T08:02:48.074715Z",
"serializer": {
"type": "pickle",
"picklelib": "cloudpickle",
"picklelib_version": null
},
"prefect_version": "3.0.1",
"storage_block_id": null
},
"result": "gAVOLg==\n"
}
My question is: how can I load up this object in an interactive Python session and play around with it. Prefect must be able to do this internally to restart from a cached state.Paul Gierz
10/09/2024, 9:36 AMdef inspect_result(result):
with open(result, "r") as file:
cached_data = json.load(file)
# FIXME: handle which pickling library...
pickled_result_base64 = cached_data["result"]
pickled_result = base64.b64decode(pickled_result_base64)
unpickled_result = pickle.loads(pickled_result)
return unpickled_result