Jonas
07/23/2021, 1:44 PMTypeError: cannot pickle 'weakref' object (running this with flow.run worked fine)
I narrowed it down to the use of a sqlalchemy session object I use to send and retrieve data from a database and that the object can't be serialized.
Now I'm a bit clueless as to how I should solve this because I'm unsure with what steps to take next.
from prefect import task, Flow,case, Parameter
from sqlalchemy import orm
from sqlalchemy import orm
engine = create_engine("mysql+mysqldb://{USER}:{PASSWORD}@{HOST}:{PORT}/{NAME}".format(**json.load(f)))
session = orm.Session(bind=engine)
@task(max_retries=3, retry_delay=timedelta(seconds=10))
def commit_to_db(object_list: List):
session.add_all(object_list)
try:
session.commit()
except exc.SQLAlchemyError as error:
session.rollback()
#logger.warning(error)
raise Exception(error)
session.close()
Kevin Kho
07/23/2021, 1:46 PMengine
and session
to be created inside the task?Jonas
07/23/2021, 1:48 PMTypeError: cannot pickle '_thread._local' object
Kevin Kho
07/23/2021, 1:52 PMJonas
07/23/2021, 1:53 PMKevin Kho
07/23/2021, 1:57 PMcloudpickle
in order to move them to workers. But if you are just using a LocalExecutor, you can store your Flow as a script so that there is no serialization and I think this might work.flow.storage = Local(…, stored_as_script=True)
Jonas
07/23/2021, 1:57 PMKevin Kho
07/23/2021, 1:58 PM