Hi All! I am investigating the possibility of usin...
# prefect-community
f
Hi All! I am investigating the possibility of using prefect as workflow management tool in hydraulic and hydrological modeling systems. While my flow runs nicely as a script it throws an error when I register it with the local agent and start it from the server. We want to use a protocol buffer to move time series data between tasks but when starting the flow through the web UI, it throws the error below. I assume this has something to do with how data is transferred trom one task to another? If so, is it not possible to map (??) protobuffs? Would it be an adequate workaround to transform the protocol buffer messages to dictionaries?
Copy code
Unexpected error: TypeError("cannot pickle 'google.protobuf.pyext._message.MessageDescriptor' object")
Traceback (most recent call last):
  File "/home/fkluiben/bin/miniconda3/envs/RodeoEnv/lib/python3.8/site-packages/prefect/engine/runner.py", line 48, in inner
    new_state = method(self, state, *args, **kwargs)
  File "/home/fkluiben/bin/miniconda3/envs/RodeoEnv/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 934, in get_task_run_state
    state._result.store_safe_value()
  File "/home/fkluiben/bin/miniconda3/envs/RodeoEnv/lib/python3.8/site-packages/prefect/engine/result/base.py", line 126, in store_safe_value
    value = self.result_handler.write(self.value)
  File "/home/fkluiben/bin/miniconda3/envs/RodeoEnv/lib/python3.8/site-packages/prefect/engine/result_handlers/local_result_handler.py", line 81, in write
    f.write(cloudpickle.dumps(result))
  File "/home/fkluiben/bin/miniconda3/envs/RodeoEnv/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py", line 63, in dumps
    cp.dump(obj)
  File "/home/fkluiben/bin/miniconda3/envs/RodeoEnv/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py", line 548, in dump
    return Pickler.dump(self, obj)
TypeError: cannot pickle 'google.protobuf.pyext._message.MessageDescriptor' object
BTW, apologies for my way of describing the issue. I am sure it's pretty clumsy and inaccurate. Looking forward to learn and raise my awareness of prefect terminology!!
j
The error is happening when the result is being written to disk as part of the result handler (by default pickle is used to serialize results when saving checkpoints). A few options: • Don't configure a resulthandler for your flow. • Set
checkpoint=False
for the task(s) returning protobufs, these won't be checkpointed to disk then. • Serialize your protobufs to bytes first before returning them, then deserialize inside downstream tasks. • Return a different object type that pickle does support If you want to serialize data as protobufs with pickle, you could work around this by using
copyreg
to register your protobuf objects so that pickle knows about them: https://docs.python.org/3/library/copyreg.html
I'd probably opt for not using protobufs, or for getting protobufs working with pickle, but any of those options is viable.
f
Hey @Jim Crist-Harif, thank you very much for your response! I'll take a look into getting protobufs to work with pickle. I think using protobufs as the data exchange format was an earlier and more fundamental design decision...so no way to avoid them. Re the checkpoints: This does not happen if I just run the flow as a script without registering it with an agent?
l
Hi @Florian K. (He/Him), checkpointing is enabled by default with server (or Prefect Cloud), and has to be opted-out with
checkpoint=False
on the task, whereas it is disabled by default in Core only aka when you are running it as a script (it can be enabled globally in Core only by configuring prefect.config.flows.checkpointing to True). There’s a bit more information at https://docs.prefect.io/core/advanced_tutorials/using-result-handlers.html#setting-up-to-handle-results, everywhere you see “Cloud” in that configuration explanation you can imagine it also says “server”!
f
Thank you for the link @Laura Lorenz (she/her). So far I tried to disable the checkpoints on multiple levels and to write my own ResultHandler. Disabling the checkpoints sees to have no effect. I set the environment variable
Copy code
PREFECT__FLOWS__CHECKPOINTING=false
and all my test setup looks like this:
Copy code
@task(checkpoint=False)
def load_protobuff():
    refpath = os.path.join(settings.BASE_DIR,
    ...
    ts = ts_helpers.binfile2ts(refpath)
    return ts

@task(checkpoint=False)
def print_protobuff(ts):
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>(f'mi: {ts.metaInfo}')
    <http://logger.info|logger.info>(f'series: {ts_helpers.ts2series(ts)}')
 
with Flow("Logs4Development") as flow:
    tsresult = load_protobuff()
    print_protobuff(tsresult)

flow.register(project_name="PB2 Test")
When I run it on the server, load_protobuff finishes successful, but when print_protobuff is being invoked, the LocalResultHandler runs into the pickling error again. Why would it even try to write the result to disk? BTW, I also tried writing my own ResultHandler, but this is a story for another time I guess 🙂 I appreciate all your help!!!!
l
@Florian K. (He/Him) Oh!!! I just remembered about this bug: https://github.com/PrefectHQ/prefect/issues/2438! If that is what is getting you, the fix is in master right now and should go out tomorrow in 0.11.0 release.
f
@Laura Lorenz (she/her) This 0.11.00 looks promising, thank you! I just wondering, what would be the fastest way to get my hands on it? Is the release via conda/pip pretty instantaneous?
l
@Florian K. (He/Him) it has just released 21 minutes ago! I just pip install upgraded it myself, so it’s on PyPI at least!
f
Woohoo! Thanks for letting me know!
@Laura Lorenz (she/her) Just wanted to thank you once again for helping me with this issue. Got the server/agent to run by turning off the checkpoints. Probably not the best solution but it works for now.
l
@Florian K. (He/Him) no problem glad it’s working now! If you do want to go for checkpointing later, using a pickle-able return type is the only thing that will work with our built-in Results for now, but you could definitely subclass it to serialize/deserialize your output a different way. We switched to the new Result class interface partially because the old Result Handlers were confusing, so hopefully it should be easier now since I know you tried writing your own result handler earlier 🙂