Michael Reynolds
07/19/2022, 5:32 PMclass MyHeavyProcessor:
## this is a super costly operation
def init():
pass
def _do_something( messages : list ) -> list:
##
pass
def process( messages : list ) -> int:
return self._do_something( messages )
@task( name='execute-heavy-processor' ):
def execute_processors( messages : list ):
processor = MyHeavyProcessor()
processor.init()
return processor.process( messages )
@task( name = 'execute-something-else' )
def execute_something_else( message ):
pass
@flow( name = 'my-flow' )
def my_flow():
output = execute_processors( my_source.poll_messages() )
for m in output:
execute_something_else( m )
MyHeavyProcessor
fails because my heavy processor has a whole bunch of stuff that is probably not easily serializable.MyHeavyProcessor
Kevin Kho
07/19/2022, 6:04 PMcheckpoint=False
to work, which is why I think you brought that up a while ago right? We need a task that can return the singleton without pickling itMichael Reynolds
07/19/2022, 6:04 PMKevin Kho
07/19/2022, 6:05 PMMichael Reynolds
07/19/2022, 6:06 PMKevin Kho
07/19/2022, 6:08 PMMichael Reynolds
07/19/2022, 6:09 PMKevin Kho
07/19/2022, 6:09 PMMichael Reynolds
07/19/2022, 6:09 PMKevin Kho
07/19/2022, 6:09 PMMichael Reynolds
07/19/2022, 8:21 PMKevin Kho
07/19/2022, 9:22 PM