Hello, I am currently trying to register a flow to...
# prefect-server
j
Hello, I am currently trying to register a flow to a local prefect server, but I ran into the following issue:
Copy code
TypeError: cannot pickle 'weakref' object (running this with flow.run worked fine)
I narrowed it down to the use of a sqlalchemy session object I use to send and retrieve data from a database and that the object can't be serialized. Now I'm a bit clueless as to how I should solve this because I'm unsure with what steps to take next.
Copy code
from prefect import task, Flow,case, Parameter
from sqlalchemy import orm
from sqlalchemy import orm

engine = create_engine("mysql+mysqldb://{USER}:{PASSWORD}@{HOST}:{PORT}/{NAME}".format(**json.load(f)))
session = orm.Session(bind=engine)

@task(max_retries=3, retry_delay=timedelta(seconds=10))
def commit_to_db(object_list: List):

    session.add_all(object_list)
    try:
        session.commit()

    except exc.SQLAlchemyError as error:
        session.rollback()
        #logger.warning(error)
        raise Exception(error)

    session.close()
k
Hey @Jonas, task inputs and outputs need to be serializeable. In this though, I think the problem is from using the session like this. Could you try moving
engine
and
session
to be created inside the task?
j
With running it in the task itself I get the following error
Copy code
TypeError: cannot pickle '_thread._local' object
k
Are you using Dask in your Flow?
j
no, but the engine variable calls another sqlalchemy create_engine object from another file
could this be part of the problem?
I threw everything together inside the task to create the session and now it works
however this doesn't help with making the code look clean
thanks for pointing me in the right direction @Kevin Kho 🙂
k
So the requirement is there because tasks need to be serializeable by
cloudpickle
in order to move them to workers. But if you are just using a LocalExecutor, you can store your Flow as a script so that there is no serialization and I think this might work.
flow.storage = Local(…, stored_as_script=True)
j
will this also work with prefect server?
k
You can check this for more info. This is not recommended as it will break when Dask gets into the mix, but I think it will work the same for both Cloud and Server.