Thread
#prefect-server
    Jonas

    Jonas

    1 year ago
    Hello, I am currently trying to register a flow to a local prefect server, but I ran into the following issue:
    TypeError: cannot pickle 'weakref' object (running this with flow.run worked fine)
    I narrowed it down to the use of a sqlalchemy session object I use to send and retrieve data from a database and that the object can't be serialized. Now I'm a bit clueless as to how I should solve this because I'm unsure with what steps to take next.
    from prefect import task, Flow,case, Parameter
    from sqlalchemy import orm
    from sqlalchemy import orm
    
    engine = create_engine("mysql+mysqldb://{USER}:{PASSWORD}@{HOST}:{PORT}/{NAME}".format(**json.load(f)))
    session = orm.Session(bind=engine)
    
    @task(max_retries=3, retry_delay=timedelta(seconds=10))
    def commit_to_db(object_list: List):
    
        session.add_all(object_list)
        try:
            session.commit()
    
        except exc.SQLAlchemyError as error:
            session.rollback()
            #logger.warning(error)
            raise Exception(error)
    
        session.close()
    Kevin Kho

    Kevin Kho

    1 year ago
    Hey @Jonas, task inputs and outputs need to be serializeable. In this though, I think the problem is from using the session like this. Could you try moving
    engine
    and
    session
    to be created inside the task?
    Jonas

    Jonas

    1 year ago
    With running it in the task itself I get the following error
    TypeError: cannot pickle '_thread._local' object
    Kevin Kho

    Kevin Kho

    1 year ago
    Are you using Dask in your Flow?
    Jonas

    Jonas

    1 year ago
    no, but the engine variable calls another sqlalchemy create_engine object from another file
    could this be part of the problem?
    I threw everything together inside the task to create the session and now it works
    however this doesn't help with making the code look clean
    thanks for pointing me in the right direction @Kevin Kho 🙂
    Kevin Kho

    Kevin Kho

    1 year ago
    So the requirement is there because tasks need to be serializeable by
    cloudpickle
    in order to move them to workers. But if you are just using a LocalExecutor, you can store your Flow as a script so that there is no serialization and I think this might work.
    flow.storage = Local(…, stored_as_script=True)
    Jonas

    Jonas

    1 year ago
    will this also work with prefect server?
    Kevin Kho

    Kevin Kho

    1 year ago
    You can check this for more info. This is not recommended as it will break when Dask gets into the mix, but I think it will work the same for both Cloud and Server.