Ayla Khan
07/15/2021, 9:23 PMClient.get_flow_run_info(..)
, the read function is using the default pickle serializer. I used prefect 0.15.1 in both the flow environment and to run the read code. Am I missing something? Thank you in advance!
result = GCSResult(
bucket="prefect-cloud-results",
location="{flow_name}/{flow_run_id}/provenance.json",
serializer=JSONSerializer()
)
@task(result=result)
def set_provenance_data(flow_run_id: str, prefect_cloud_client: prefect.client.client.Client = None):
...
Kevin Kho
Ayla Khan
07/15/2021, 9:31 PMAyla Khan
07/16/2021, 5:40 AMprefecthq/prefect:0.15.1-python3.6
) as I used to run the flow that serialized the result. Still seeing the errorKevin Kho
cloudpickle
version? I’ll make an environment and test thisAyla Khan
07/16/2021, 3:24 PMroot@893f541d47cb:/# pip list | grep cloudpickle
cloudpickle 1.6.0
Kevin Kho
Ayla Khan
07/16/2021, 4:14 PMKevin Kho
Ayla Khan
07/19/2021, 3:46 PMKevin Kho
@task(result = xxx)
or Flow(result = xxx)
or
@task
def abc():
result.write(xxx)
Wondering how the result was defined.Ayla Khan
07/19/2021, 5:03 PM@task(result = xxx)
Kevin Kho
Ayla Khan
07/20/2021, 8:10 PMClient.get_flow_run_info
and the task run data that gets returned, which is what I was testing in the script snippet I posted. I have a workaround by calling raw google storage code directly, but it would be nice to be able to get that data by reading from the result.
Is there any way to specify what serializer a result should use when reading a value from a previously run task state's result? Would I set a Result on a task or flow that's instantiated with the serializer the result should use and then call the result read function?Kevin Kho
res = GCSResult(bucket="xxx", serializer=JSONSerializer())
, and then you can do res.read(location)
Ayla Khan
07/20/2021, 8:20 PMAyla Khan
07/20/2021, 10:59 PMKevin Kho