Hi All! Prefect2.0b7 Dockerflowrunner, dasktaskrunner. I am getting the following error
_pickle.PicklingError: Can't pickle <function db_con at 0x7f85c986f880>: it's not the same object as __main__.db_con
when I run a task with querying a database. the code runs fine without that db connection task.
I am using SQLAlchemy, sessionmaker for db connection
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
Base = declarative_base()
class Customers(Base): #_TEST
__tablename__ = 'records'
id = Column(String, primary_key=True)
dt_ut = Column(Integer)
Session = sessionmaker(bind = engine)
session = Session()
def ConvertToDict(a):
it = iter(a)
res_dct = dict(zip(it, it))
return res_dct
# Query db table and convert it to dictonary
def query_db(x):
qur = session.query(x.lid, x.dtUtDownload).all()
qur1= list(itertools.chain(*qur))
return ConvertToDict(qur1)
def db_con(): res = query_db(LRD)
connections can't be pickled atm try moving the connection to the task that needs it without passing the connection between tasks
@Anna Geller still same error. I created a separate task for querying a db.
def C():
could you build a minimal example I could use to reproduce the same error?
from pathlib import *
from prefect import task, flow
from prefect import get_run_logger
from prefect.blocks.storage import FileStorageBlock
from prefect.deployments import DeploymentSpec
from prefect.flow_runners import DockerFlowRunner
from prefect.task_runners import SequentialTaskRunner, DaskTaskRunner
from sqlalchemy import create_engine, MetaData, Table, Column, Integer, String, VARCHAR, text
from sqlalchemy.engine.url import URL
from sqlalchemy.dialects.postgresql import insert
import sqlalchemy
import itertools
url = URL.create('postgresql', 'admin', '123', 'localhost', '5432', 'postgres_db')
engine = create_engine(url, echo = False)
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
Base = declarative_base()
class LRD(Base):
__tablename__ = 'LastRecordedDate'
lid = Column(String, primary_key=True)
dtUtDownload = Column(String)
DtUtProcessed = Column(String)
Tmpcol = Column(String)
Session = sessionmaker(bind = engine)
session = Session()
def ConvertToDict(a):
it = iter(a)
res_dct = dict(zip(it, it))
return res_dct
# Query db table and convert it to dictonary
def query_db(x):
qur = session.query(x.lid, x.dtUtDownload).all()
qur1= list(itertools.chain(*qur))
return ConvertToDict(qur1)
def db_con():
LRD_dict = query_db(LRD)
def main1():
if __name__ == "__main__":
@Anna Geller I have had a successful run in python without prefect.
@Anna Geller
connections can't be pickled atm
try moving the connection to the task that needs it without passing the connection between tasks (edited)
What I found so far is that any db connection in task cannot be pickled. I tried with SQLalchemy and pony both giving me same error.
tried prefect 2.08b same result
Why are you using Dask here? Try without Dask, this will likely help fix your issue. You're not running anything in parallel here, so Dask causes only unnecessary trouble atm :)
@Anna Geller thats one section of code. original code scans nested directories, downloads and process the flies. A postgress database keeps tracks of the files. below snap is the parent dir. I running a for loop on a list of the directory names to download the files(parallel )
My code works like this:
scans dir >> findfiles>>check db if the file is downloaded Y/N>>download the files>>update db with the latest download date per folder i.e 1004 18-07-2022
before prefect I was using python multiprocessing and it would spawn a new process per folder name i .e 1004 depending on number of processors for 4 processors there would be 4 process running each following the same logic I explained above.