Tried both running on Ubuntu 20.04.02 and Windows 10, both are able to run flow locally, even tried using dockers and got the same error. I am using python package pymssql to extract data from database and do some basic transformations with petl. Works fine when I run the flow without any agent/cloud. It's able to extract data and do transformations. Figure it's something to do with how it's passing data between functions.
Unexpected error: TypeError('no default __reduce__ due to non-trivial __cinit__') Traceback (most recent call last): File "/home/USERNAME/.local/lib/python3.8/site-packages/prefect/engine/runner.py", line 48, in inner new_state = method(self, state, *args, **kwargs) File "/home/USERNAME/.local/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 900, in get_task_run_state result = self.result.write(value, **formatting_kwargs) File "/home/USERNAME/.local/lib/python3.8/site-packages/prefect/engine/results/local_result.py", line 116, in write value = self.serializer.serialize(new.value) File "/home/USERNAME/.local/lib/python3.8/site-packages/prefect/engine/serializers.py", line 73, in serialize return cloudpickle.dumps(value) File "/home/USERNAME/.local/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py", line 73, in dumps cp.dump(obj) File "/home/USERNAME/.local/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py", line 563, in dump return Pickler.dump(self, obj) File "stringsource", line 2, in pymssql._mssql.MSSQLConnection.__reduce_cython__ TypeError: no default __reduce__ due to non-trivial __cinit__
Unexpected error: TypeError("cannot pickle 'pyodbc.Connection' object")
for local testing and
for backend usage.
. You can try starting the agent with
with the CLI assuming you’re using the local agent. I re-read and I think you’re using
prefect local agent start
connection in the return of a task and feed that onto the next that to use that connection. You can try initializing the connection and using it inside the same task.
def fetch_product_data(): ''' Extract product data from SQL ''' conn = pymssql.connect(config.get('DB_SERVER'), config.get('DB_USER'), config.get('DB_PASSWORD'), config.get('DB')) table = etl.fromdb( conn, 'SELECT col FROM table') return table # will not work
so nothing has to be sent to Dask workers. You may also have to store the flow as script also if you want to do it this way.