Faheem Khan
07/14/2022, 8:54 AM_pickle.PicklingError: Can't pickle <function db_con at 0x7f85c986f880>: it's not the same object as __main__.db_con
when I run a task with querying a database. the code runs fine without that db connection task.from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
Base = declarative_base()
class Customers(Base): #_TEST
__tablename__ = 'records'
id = Column(String, primary_key=True)
dt_ut = Column(Integer)
Base.metadata.create_all(engine)
Session = sessionmaker(bind = engine)
session = Session()
def ConvertToDict(a):
it = iter(a)
res_dct = dict(zip(it, it))
return res_dct
# Query db table and convert it to dictonary
def query_db(x):
qur = session.query(x.lid, x.dtUtDownload).all()
qur1= list(itertools.chain(*qur))
return ConvertToDict(qur1)
@task
def db_con():
res = query_db(LRD)Anna Geller
07/14/2022, 10:39 AMFaheem Khan
07/14/2022, 10:54 PM@flow(task_runner=DaskTaskRunner())
def C():
a1=db_con()
Anna Geller
07/15/2022, 1:29 AMFaheem Khan
07/15/2022, 3:18 AMfrom pathlib import *
from prefect import task, flow
from prefect import get_run_logger
from prefect.blocks.storage import FileStorageBlock
from prefect.deployments import DeploymentSpec
from prefect.flow_runners import DockerFlowRunner
from prefect.task_runners import SequentialTaskRunner, DaskTaskRunner
from sqlalchemy import create_engine, MetaData, Table, Column, Integer, String, VARCHAR, text
from sqlalchemy.engine.url import URL
from sqlalchemy.dialects.postgresql import insert
import sqlalchemy
import itertools
url = URL.create('postgresql', 'admin', '123', 'localhost', '5432', 'postgres_db')
engine = create_engine(url, echo = False)
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
Base = declarative_base()
class LRD(Base):
__tablename__ = 'LastRecordedDate'
lid = Column(String, primary_key=True)
dtUtDownload = Column(String)
DtUtProcessed = Column(String)
Tmpcol = Column(String)
Base.metadata.create_all(engine)
Session = sessionmaker(bind = engine)
session = Session()
def ConvertToDict(a):
it = iter(a)
res_dct = dict(zip(it, it))
return res_dct
# Query db table and convert it to dictonary
def query_db(x):
qur = session.query(x.lid, x.dtUtDownload).all()
qur1= list(itertools.chain(*qur))
return ConvertToDict(qur1)
@task
def db_con():
LRD_dict = query_db(LRD)
print(LRD_dict)
@flow(task_runner=DaskTaskRunner())
def main1():
a=db_con()
print(a.result())
if __name__ == "__main__":
main1()
DeploymentSpec(
name="downloader_dask_01",
flow=main1,
flow_runner=DockerFlowRunner())
connections can't be pickled atm
try moving the connection to the task that needs it without passing the connection between tasks (edited)
What I found so far is that any db connection in task cannot be pickled. I tried with SQLalchemy and pony both giving me same error.Anna Geller
07/17/2022, 11:05 AMFaheem Khan
07/17/2022, 11:14 PMscans dir >> findfiles>>check db if the file is downloaded Y/N>>download the files>>update db with the latest download date per folder i.e 1004 18-07-2022
Anna Geller
07/18/2022, 1:19 AM