PKay

    PKay

    1 year ago
    Having a weird error when running flow from Prefect Cloud. I am able to run the flow locally and register to cloud but when I trigger a run from the cloud an error stops one of the tasks to finish correctly.
    Unexpected error: TypeError('no default __reduce__ due to non-trivial __cinit__')
    Traceback (most recent call last):
      File "/home/USERNAME/.local/lib/python3.8/site-packages/prefect/engine/runner.py", line 48, in inner
        new_state = method(self, state, *args, **kwargs)
      File "/home/USERNAME/.local/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 900, in get_task_run_state
        result = self.result.write(value, **formatting_kwargs)
      File "/home/USERNAME/.local/lib/python3.8/site-packages/prefect/engine/results/local_result.py", line 116, in write
        value = self.serializer.serialize(new.value)
      File "/home/USERNAME/.local/lib/python3.8/site-packages/prefect/engine/serializers.py", line 73, in serialize
        return cloudpickle.dumps(value)
      File "/home/USERNAME/.local/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py", line 73, in dumps
        cp.dump(obj)
      File "/home/USERNAME/.local/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py", line 563, in dump
        return Pickler.dump(self, obj)
      File "stringsource", line 2, in pymssql._mssql.MSSQLConnection.__reduce_cython__
    TypeError: no default __reduce__ due to non-trivial __cinit__
    Tried both running on Ubuntu 20.04.02 and Windows 10, both are able to run flow locally, even tried using dockers and got the same error. I am using python package pymssql to extract data from database and do some basic transformations with petl. Works fine when I run the flow without any agent/cloud. It's able to extract data and do transformations. Figure it's something to do with how it's passing data between functions.
    Now also tried switching out pymssql for pyodbc and new error is similar but clearer.
    Unexpected error: TypeError("cannot pickle 'pyodbc.Connection' object")
    Kevin Kho

    Kevin Kho

    1 year ago
    Hey @PKay, typically you only want one of either
    flow.run
    or
    flow.register
    .
    flow.run
    for local testing and
    flow.register
    for backend usage.
    You also should not need the
    flow.run_agent
    . You can try starting the agent with
    prefect local agent start
    with the CLI assuming you’re using the local agent. I re-read and I think you’re using
    flow.run
    and
    flow.register
    correctly.
    But this error specifically is related to task outputs not being serializable I think. In order to distribute task to Dask workers, Prefect needs them to be serializable by
    cloudpickle
    Typically this happens if you pass an
    odbc
    connection in the return of a task and feed that onto the next that to use that connection. You can try initializing the connection and using it inside the same task.
    PKay

    PKay

    1 year ago
    @Kevin Kho yeah just wanted to run the whole process in one go not optimal 🙂 Thanks for the odbc tip, for reference looks like this is something that petl does. I'll look for ways to change it.
    def fetch_product_data():
        ''' Extract product data from SQL '''
        conn = pymssql.connect(config.get('DB_SERVER'), config.get('DB_USER'),
                               config.get('DB_PASSWORD'), config.get('DB'))
        table = etl.fromdb(
            conn, 'SELECT col FROM table')
    
        return table
        # will not work
    Kevin Kho

    Kevin Kho

    1 year ago
    Some users can get away with this by using
    LocalExecutor
    so nothing has to be sent to Dask workers. You may also have to store the flow as script also if you want to do it this way.